From "Abdeali Kothari (JIRA)" <>
Subject [jira] [Created] (ARROW-4890) Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
Date Fri, 15 Mar 2019 11:37:00 GMT
Abdeali Kothari created ARROW-4890:

             Summary: Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
                 Key: ARROW-4890
             Project: Apache Arrow
          Issue Type: Bug
          Components: Python
    Affects Versions: 0.8.0
         Environment: Cloudera cdh5.13.3
Cloudera Spark 2.3.0.cloudera3
            Reporter: Abdeali Kothari

Creating this in Arrow project as the traceback seems to suggest this is an issue in Arrow.
 Continuation from the conversation on the [|Email

When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/",
line 279, in load_stream
    for batch in reader:
  File "pyarrow/ipc.pxi", line 265, in __iter__
  File "pyarrow/ipc.pxi", line 281, in pyarrow.lib._RecordBatchReader.read_next_batch
  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: read length must be positive or -1
as my dataset size starts increasing that I want to group on. Here is a reproducible code
snippet where I can reproduce this.
 Note: My actual dataset is much larger and has many more unique IDs and is a valid usecase
where I cannot simplify this groupby in any way. I have stripped out all the logic to make
this example as simple as I could.
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
import findspark
import pyspark
from pyspark.sql import functions as F, types as T
import pandas as pd

spark = pyspark.sql.SparkSession.builder.getOrCreate()

pdf1 = pd.DataFrame(
	[[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
	columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
df1 = spark.createDataFrame(pd.concat([pdf1 for i in range(429)]).reset_index()).drop('index')

pdf2 = pd.DataFrame(
	[[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", "abcdefghijklmno"]],
	columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
df2 = spark.createDataFrame(pd.concat([pdf2 for i in range(48993)]).reset_index()).drop('index')
df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')

def myudf(df):
    return df

df4 = df3
udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)

df5 = df4.groupBy('df1_c1').apply(udf)
print('df5.count()', df5.count())

# df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per executor too.

