Hey Jesse,

This looks like the buffer was not put correctly onto Redis. Java is not able to deserialize the message. I recommend putting the hex string from python into redis. You have to be careful about feeding the correct bytes to arrow from redis so double check the hex str -> bytes deserialization is ok too.


On Wed, Jul 22, 2020 at 6:49 AM Jesse Wang <hello.wjx@gmail.com> wrote:
 Hi Ryan,
Thanks for your reply.

On Tue, Jul 21, 2020 at 8:54 PM Ryan Murray <rymurr@dremio.com> wrote:
Hey Jiaxing,

You want to use the IPC mechanism to pass arrow buffers between languages[1]

First get a buffer:
import pyarrow as pa

data = [
    pa.array([1, 2, 3, 4]),
    pa.array(['foo', 'bar', 'baz', None]),
    pa.array([True, None, False, True])
batch = pa.record_batch(data, names=['f0', 'f1', 'f2'])
sink = pa.BufferOutputStream()
writer = pa.ipc.new_stream(sink, batch.schema)
buf = sink.getvalue()

The buffer could be written to Redis, to a file etc. For redis I think `r.set("key", buf.hex())` is easiest, you don't have to worry about encoding.

On the java side something like:
    Jedis jedis = new Jedis();
    String buf = jedis.get("key");
    RootAllocator rootAllocator = new RootAllocator(Long.MAX_VALUE);
    ByteArrayInputStream in = new ByteArrayInputStream(hexStringToByteArray(buf));
    ArrowStreamReader stream = new ArrowStreamReader(in, rootAllocator);
    VectorSchemaRoot vsr = stream.getVectorSchemaRoot();
And the VectorSchemaRoot holds the correct Arrow Buffer.

I tested this and get the following exception thrown:
Exception in thread “main” java.lang.IllegalArgumentException
  at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
  at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:547)
  at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
  at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
  at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:178)
  at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:169)
  at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:62)
  at Main.main(Main.java:38)
While Redis will work for this you might find a file or socket a bit more ergonomic in Arrow. The Plasma object store is also an option[2] which you can think of as a primitive Redis specifically for Arrow Buffers. Finally, if you are using Redis as a message bus you might find the Arrow RPC mechanism Arrow Flight is a good choice[3].
As for the Plasma, It seems it currently limited to a single host... (From its source code arrow/cpp/src/plasma/io.cc, it used AF_UNIX socket only)

On Tue, Jul 21, 2020 at 10:57 AM Jesse Wang <hello.wjx@gmail.com> wrote:
I want to have a Java process read the content of DataFrames produced by a Python process. The Java and Python processes run on different hosts.

The solution I can think of is to have the Python process serialize the DataFrame and save it to redis, and have the Java process parse the data.

The solution I find serializes the DataFrame to 'pybytes':
   import pandas as pd
import pyarrow as pa
import redis

r = redis.Redis(host='localhost', port=6379, db=0)

context = pa.default_serialization_context()
r.set("key", context.serialize(df).to_buffer().to_pybytes())
0  1
1  2
2  3

I wonder if this serialized 'pybytes' can be parsed at the Java end? If not, how can I achieve this properly?



Best Regards,
Jiaxing Wang


Best Regards,
Jiaxing Wang