arrow-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xander Dunn" <xan...@xander.ai>
Subject [C++] Decoding Plasma Objects as RecordBatches
Date Mon, 19 Apr 2021 07:27:45 GMT
In Python I'm encoding RecordBatches like this:

```python

client = plasma.connect("/tmp/plasma")

object_id = plasma.ObjectID(bytes(my_id, "ascii"))

mock_sink = pa.MockOutputStream()

stream_writer = pa.RecordBatchStreamWriter(mock_sink, pybatch.schema)

stream_writer.write_batch(pybatch)

stream_writer.close()

data_size = mock_sink.size()

buf = client.create(object_id, data_size)

stream = pa.FixedSizeBufferWriter(buf)

stream_writer = pa.RecordBatchStreamWriter(stream, pybatch.schema)

stream_writer.write_batch(pybatch)

stream_writer.close()

client.seal(object_id)

```

This works, code from here: https://arrow.apache.org/docs/python/plasma.html

I am now having difficulty figuring out the right calls on the C++ side to decode these RecordBatch
messages. I am successfully getting the Plasma objects as arrow::Buffer's, but I haven't managed
to decode it into a RecordBatch:

```c++

auto object_id = plasma::ObjectID::from_binary(my_id);

plasma::ObjectBuffer object_buffer;

arrow::Status status = client.Get(&object_id, 1, -1, &object_buffer);

std::shared_ptr<Buffer> data = object_buffer.data;

fmt::print("{} Got data with size {}\n", current_id, data->size());

// Everything above works and prints the object size in bytes I'm expecting

//auto buf_reader = arrow::io::BufferReader(buffer);

//auto reader = arrow::ipc::RecordBatchStreamReader::Open(&buf_reader);

// auto batch = reader.ReadNextBatch();

```

I'm stuck on creating the BufferReader.  I believe it's declared in arrow/io/memory.h, so
I include that with `#include <arrow/io/memory.h>` and get these compile errors in clang:

```

pymydata/pymydata/PreprocessData.cc:312:18 ( http://pymydata/pymydata/PreprocessData.cc:312:18
) : error: call to implicitly-deleted copy constructor of 'arrow::io::BufferReader'

auto buf_reader = arrow::io::BufferReader(buffer);

^            ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

/home/xander/anaconda3/envs/my_model/include/arrow/io/memory.h:146:7: note: copy constructor
of 'BufferReader' is implicitly deleted because base class 'internal::RandomAccessFileConcurrencyWrapper<BufferReader>'
has a deleted copy constructor

: public internal::RandomAccessFileConcurrencyWrapper<BufferReader> {

^

/home/xander/anaconda3/envs/my_model/include/arrow/io/concurrency.h:162:57: note: copy constructor
of 'RandomAccessFileConcurrencyWrapper<arrow::io::BufferReader>' is implicitly deleted
because base class 'arrow::io::RandomAccessFile' has a deleted copy constructor

class ARROW_EXPORT RandomAccessFileConcurrencyWrapper : public RandomAccessFile {

^

/home/xander/anaconda3/envs/my_model/include/arrow/io/interfaces.h:187:7: note: copy constructor
of 'RandomAccessFile' is implicitly deleted because base class 'arrow::io::InputStream' has
a deleted copy constructor

public InputStream,

^

```

GCC produces similar errors.

Arrow 3.0.0. C++11. Am I calling the BufferReader init correctly? Do I have the right #include?
Any pointers on decoding RecordBatches on the C++ side will be helpful. Hopefully I'm missing
something simple.

Thanks,

Xander
Mime
View raw message