arrow-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maqy <maqy1...@163.com>
Subject How does batch size affect streaming read and write performance in Apache Arrow?
Date Wed, 22 Apr 2020 07:17:39 GMT
I am using Apache Arrow to transfer data between Spark(scala) and Tensorflow(python).

In Spark, I got an Array[Row], and I send the Array[Row] by socket through the following code:

```
val rows_array: Array[Row] = df.rdd.collect()
val ss = ServerSocket = new ServerSocket(port)
val iter = rows_array.iterator
// Grouped a iterator into batches, batchIter is a Iterator[Iterator[Row]]
val batchIter = new BatchIterator(iter, arrowRecordBatchSize)
val root = new VectorSchemaRoot(new Schema(fields), vectors, arrowRecordBatchSize)
val out = new DataOutputStream(socket.getOutputStream)
val writer = new ArrowStreamWriter(root, null, out)
writer.start()

sendRecordIterator(root, arrowStreamWriter, batchIter)
```
sendRecordIterator is defined as follows:
```
  def sendRecordIterator(root: VectorSchemaRoot,
                         writer: ArrowStreamWriter,
                         inputIterator: Iterator[Iterator[Row]]):
Unit = { //The Iterator split data to batch
    try {
      val arrowWriter = MyArrowWriter.create(root)
      while (inputIterator.hasNext) {
        val nextBatch = inputIterator.next()

        while (nextBatch.hasNext) {
          arrowWriter.write(nextBatch.next())
        }

        arrowWriter.finish()
        writer.writeBatch()
        arrowWriter.reset()
      }
      writer.end()
    } finally {
      root.close()
    }
  }

```
In python, I get a ds by the following code:

```
    ds = arrow_io.ArrowStreamDataset(
        [endpoint],
        columns=my_columns,
        output_types=my_types,
        output_shapes=my_shapes,
        batch_size=arrowRecordBatchSize,
        batch_omde='keep_remainder')
```
Then I converted the ds to numpy and traversed it:

```
    ds_numpy = tfds.as_numpy(ds)
    my_list = list()
    for x in ds_numpy:
        my_list.append(x)
```

I set different batch_size(arrowRecordBatchSize) to send the data, the results show that there
are different transmission efficiencies:

> arrowRecordBatchSize = 1,000(about 8kB)      =======>161s  
> arrowRecordBatchSize = 50,000(about 400kB)   =======>63s  
> arrowRecordBatchSize = 100,000(about 800kB)   =======>65s  
> arrowRecordBatchSize = 1,000,000(about 8MB)   =======>108s  
> arrowRecordBatchSize = 10,000,000(about 80MB) =======>113s  
> arrowRecordBatchSize = 50,000,000(about 400MB) =======>112s

So, how does batch size affect streaming read and write performance in Apache Arrow? Are there
any best practices for batch size?

ps: I noticed that the sender seems to be using nio. Is it the key to this phenomenon?


Best regards,
maqy

Mime
View raw message