arrow-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Micah Kornfield <emkornfi...@gmail.com>
Subject Re: How does batch size affect streaming read and write performance in Apache Arrow?
Date Sun, 26 Apr 2020 23:26:49 GMT
Hi Maqy,
Batch sizes are probably going to be application dependent.

Given that you have two moving components it would probably be easier to
answer specific questions if you could isolate the expected anomaly to
either the java side or python side.

Thanks,
Micah

On Wed, Apr 22, 2020 at 12:17 AM maqy <maqy1995@163.com> wrote:

> I am using Apache Arrow to transfer data between Spark(scala) and
> Tensorflow(python).
>
>
>
> In Spark, I got an Array[Row], and I send the Array[Row] by socket through
> the following code:
>
>
>
> ```
>
> val rows_array: Array[Row] = df.rdd.collect()
>
> val ss = ServerSocket = new ServerSocket(port)
>
> val iter = rows_array.iterator
>
> // Grouped a iterator into batches, batchIter is a Iterator[Iterator[Row]]
>
> val batchIter = new BatchIterator(iter, arrowRecordBatchSize)
>
> val root = new VectorSchemaRoot(new Schema(fields), vectors,
> arrowRecordBatchSize)
>
> val out = new DataOutputStream(socket.getOutputStream)
>
> val writer = new ArrowStreamWriter(root, null, out)
>
> writer.start()
>
>
>
> sendRecordIterator(root, arrowStreamWriter, batchIter)
>
> ```
>
> sendRecordIterator is defined as follows:
>
> ```
>
>   def sendRecordIterator(root: VectorSchemaRoot,
>
>                          writer: ArrowStreamWriter,
>
>                          inputIterator: Iterator[Iterator[Row]]): Unit = {
> //The Iterator split data to batch
>
>     try {
>
>       val arrowWriter = MyArrowWriter.create(root)
>
>       while (inputIterator.hasNext) {
>
>         val nextBatch = inputIterator.next()
>
>
>
>         while (nextBatch.hasNext) {
>
>           arrowWriter.write(nextBatch.next())
>
>         }
>
>
>
>         arrowWriter.finish()
>
>         writer.writeBatch()
>
>         arrowWriter.reset()
>
>       }
>
>       writer.end()
>
>     } finally {
>
>       root.close()
>
>     }
>
>   }
>
>
>
> ```
>
> In python, I get a ds by the following code:
>
>
>
> ```
>
>     ds = arrow_io.ArrowStreamDataset(
>
>         [endpoint],
>
>         columns=my_columns,
>
>         output_types=my_types,
>
>         output_shapes=my_shapes,
>
>         batch_size=arrowRecordBatchSize,
>
>         batch_omde='keep_remainder')
>
> ```
>
> Then I converted the ds to numpy and traversed it:
>
>
>
> ```
>
>     ds_numpy = tfds.as_numpy(ds)
>
>     my_list = list()
>
>     for x in ds_numpy:
>
>         my_list.append(x)
>
> ```
>
>
>
> I set different batch_size(arrowRecordBatchSize) to send the data, the
> results show that there are different transmission efficiencies:
>
>
>
> > arrowRecordBatchSize = 1,000(about 8kB)      =======>161s
>
> > arrowRecordBatchSize = 50,000(about 400kB)   =======>63s
>
> > arrowRecordBatchSize = 100,000(about 800kB)   =======>65s
>
> > arrowRecordBatchSize = 1,000,000(about 8MB)   =======>108s
>
> > arrowRecordBatchSize = 10,000,000(about 80MB) =======>113s
>
> > arrowRecordBatchSize = 50,000,000(about 400MB) =======>112s
>
>
>
> So, how does batch size affect streaming read and write performance in
> Apache Arrow? Are there any best practices for batch size?
>
>
>
> ps: I noticed that the sender seems to be using nio. Is it the key to this
> phenomenon?
>
>
>
>
>
> Best regards,
>
> maqy
>
>
>

Mime
View raw message