arrow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Abdeali Kothari <abdealikoth...@gmail.com>
Subject Re: OversizedAllocationException for pandas_udf in pyspark
Date Mon, 11 Mar 2019 03:42:46 GMT
Hi, any help on this would be much appreciated.
I've not been able to figure out any reason for this to happen yet

On Sat, Mar 2, 2019, 11:50 Abdeali Kothari <abdealikothari@gmail.com> wrote:

> Hi Li Jin, thanks for the note.
>
> I get this error only for larger data - when I reduce the number of
> records or the number or columns in my data it all works fine - so if it is
> binary incompatibility it should be something related to large data.
> I am using Spark 2.3.1 on Amazon EMR for this testing.
> https://github.com/apache/spark/blob/v2.3.1/pom.xml#L192 seems to
> indicate arrow version is 0.8 for this.
>
> I installed pyarrow-0.8.0 in the python environment on my cluster with pip
> and I am still getting this error.
> The stacktrace is very similar, just some lines moved in the pxi files:
>
> Caused by: org.apache.spark.api.python.PythonException: Traceback (most
> recent call last):
>   File
> "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0018/container_1551469777576_0018_01_000002/pyspark.zip/pyspark/worker.py",
> line 230, in main
>     process()
>   File
> "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0018/container_1551469777576_0018_01_000002/pyspark.zip/pyspark/worker.py",
> line 225, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File
> "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0018/container_1551469777576_0018_01_000002/pyspark.zip/pyspark/serializers.py",
> line 260, in dump_stream
>     for series in iterator:
>   File
> "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0018/container_1551469777576_0018_01_000002/pyspark.zip/pyspark/serializers.py",
> line 279, in load_stream
>     for batch in reader:
>   File "pyarrow/ipc.pxi", line 268, in __iter__
> (/arrow/python/build/temp.linux-x86_64-3.6/lib.cxx:70278)
>   File "pyarrow/ipc.pxi", line 284, in
> pyarrow.lib._RecordBatchReader.read_next_batch
> (/arrow/python/build/temp.linux-x86_64-3.6/lib.cxx:70534)
>   File "pyarrow/error.pxi", line 79, in pyarrow.lib.check_status
> (/arrow/python/build/temp.linux-x86_64-3.6/lib.cxx:8345)
> pyarrow.lib.ArrowIOError: read length must be positive or -1
>
> Other notes:
>  - My data is just integers, strings, and doubles. No complex types like
> arrays/maps/etc.
>  - I don't have any NULL/None values in my data
>  - Increasing executor-memory for spark does not seem to help here
>
> As always: Any thoughts or notes would be great so I can get some pointers
> in which direction to debug
>
>
>
> On Sat, Mar 2, 2019 at 2:24 AM Li Jin <ice.xelloss@gmail.com> wrote:
>
>> The 2G limit that Uwe mentioned definitely exists, Spark serialize each
>> group as a single RecordBatch currently.
>>
>> The "pyarrow.lib.ArrowIOError: read length must be positive or -1" is
>> strange, I think Spark is on an older version of the Java side (0.10 for
>> Spark 2.4 and 0.8 for Spark 2.3). I forgot whether there is binary
>> incompatibility between these versions and pyarrow 0.12.
>>
>> On Fri, Mar 1, 2019 at 3:32 PM Abdeali Kothari <abdealikothari@gmail.com>
>> wrote:
>>
>> > Forgot to mention: The above testing is with 0.11.1
>> > I tried 0.12.1 as you suggested - and am getting the
>> > OversizedAllocationException with the 80char column. And getting read
>> > length must be positive or -1 without that. So, both the issues are
>> > reproducible with pyarrow 0.12.1
>> >
>> > On Sat, Mar 2, 2019 at 1:57 AM Abdeali Kothari <
>> abdealikothari@gmail.com>
>> > wrote:
>> >
>> > > That was spot on!
>> > > I had 3 columns with 80characters => 80*21*10^6 = 1.56 bytes
>> > > I removed these columns and replaced each with 10 doubleType columns
>> (so
>> > > it would still be 80 bytes of data) - and this error didn't come up
>> > anymore.
>> > > I also removed all the other columns and just kept 1 column with
>> > > 80characters - I got the error again.
>> > >
>> > > I'll make a simpler example and report it to spark - as I guess these
>> > > columns would need some special handling.
>> > >
>> > > Now, when I run - I get a different error:
>> > > 19/03/01 20:16:49 WARN TaskSetManager: Lost task 108.0 in stage 8.0
>> (TID
>> > > 12, ip-172-31-10-249.us-west-2.compute.internal, executor 1):
>> > > org.apache.spark.api.python.PythonException: Traceback (most recent
>> call
>> > > last):
>> > >   File
>> > >
>> >
>> "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/worker.py",
>> > > line 230, in main
>> > >     process()
>> > >   File
>> > >
>> >
>> "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/worker.py",
>> > > line 225, in process
>> > >     serializer.dump_stream(func(split_index, iterator), outfile)
>> > >   File
>> > >
>> >
>> "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/serializers.py",
>> > > line 260, in dump_stream
>> > >     for series in iterator:
>> > >   File
>> > >
>> >
>> "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/serializers.py",
>> > > line 279, in load_stream
>> > >     for batch in reader:
>> > >   File "pyarrow/ipc.pxi", line 265, in __iter__
>> > >   File "pyarrow/ipc.pxi", line 281, in
>> > > pyarrow.lib._RecordBatchReader.read_next_batch
>> > >   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
>> > > pyarrow.lib.ArrowIOError: read length must be positive or -1
>> > >
>> > > Again, any pointers on what this means and what it indicates would be
>> > > really useful for me.
>> > >
>> > > Thanks for the replies!
>> > >
>> > >
>> > > On Fri, Mar 1, 2019 at 11:26 PM Uwe L. Korn <uwelk@xhochy.com> wrote:
>> > >
>> > >> There is currently the limitation that a column in a single
>> RecordBatch
>> > >> can only hold 2G on the Java side. We work around this by splitting
>> the
>> > >> DataFrame under the hood into multiple RecordBatches. I'm not
>> familiar
>> > with
>> > >> the Spark<->Arrow code but I guess that in this case, the Spark
code
>> can
>> > >> only handle a single RecordBatch.
>> > >>
>> > >> Probably it is best to construct a
>> https://stackoverflow.com/help/mcve
>> > >> and create an issue with the Spark project. Most likely this is not
a
>> > bug
>> > >> in Arrow but just requires a bit more complicated implementation
>> around
>> > the
>> > >> Arrow libs.
>> > >>
>> > >> Still, please have a look at the exact size of your columns. We
>> support
>> > >> 2G per column, if it is only 1.5G, then there is probably a rounding
>> > error
>> > >> in the Arrow. Alternatively, you might also be in luck that the
>> > following
>> > >> patch
>> > >>
>> >
>> https://github.com/apache/arrow/commit/bfe6865ba8087a46bd7665679e48af3a77987cef
>> > >> which is part of Apache Arrow 0.12 already fixes your problem.
>> > >>
>> > >> Uwe
>> > >>
>> > >> On Fri, Mar 1, 2019, at 6:48 PM, Abdeali Kothari wrote:
>> > >> > Is there a limitation that a single column cannot be more than
>> 1-2G ?
>> > >> > One of my columns definitely would be around 1.5GB of memory.
>> > >> >
>> > >> > I cannot split my DF into more partitions as I have only 1 ID
and
>> I'm
>> > >> > grouping by that ID.
>> > >> > So, the UDAF would only run on a single pandasDF
>> > >> > I do have a requirement to make a very large DF for this UDAF
(8GB
>> as
>> > i
>> > >> > mentioned above) - trying to figure out what I need to do here
to
>> make
>> > >> this
>> > >> > work.
>> > >> > Increasing RAM, etc. is no issue (i understand I'd need huge
>> executors
>> > >> as I
>> > >> > have a huge data requirement). But trying to figure out how much
to
>> > >> > actually get - cause 20GB of RAM for the executor is also erroring
>> out
>> > >> > where I thought ~10GB would have been enough
>> > >> >
>> > >> >
>> > >> >
>> > >> > On Fri, Mar 1, 2019 at 10:25 PM Uwe L. Korn <uwelk@xhochy.com>
>> wrote:
>> > >> >
>> > >> > > Hello Abdeali,
>> > >> > >
>> > >> > > a problem could here be that a single column of your dataframe
is
>> > >> using
>> > >> > > more than 2GB of RAM (possibly also just 1G). Try splitting
your
>> > >> DataFrame
>> > >> > > into more partitions before applying the UDAF.
>> > >> > >
>> > >> > > Cheers
>> > >> > > Uwe
>> > >> > >
>> > >> > > On Fri, Mar 1, 2019, at 9:09 AM, Abdeali Kothari wrote:
>> > >> > > > I was using arrow with spark+python and when I'm trying
some
>> > >> pandas-UDAF
>> > >> > > > functions I am getting this error:
>> > >> > > >
>> > >> > > > org.apache.arrow.vector.util.OversizedAllocationException:
>> Unable
>> > to
>> > >> > > > expand
>> > >> > > > the buffer
>> > >> > > > at
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> org.apache.arrow.vector.BaseVariableWidthVector.reallocDataBuffer(BaseVariableWidthVector.java:457)
>> > >> > > > at
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> org.apache.arrow.vector.BaseVariableWidthVector.handleSafe(BaseVariableWidthVector.java:1188)
>> > >> > > > at
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1026)
>> > >> > > > at
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> org.apache.spark.sql.execution.arrow.StringWriter.setValue(ArrowWriter.scala:256)
>> > >> > > > at
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
>> > >> > > > at
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
>> > >> > > > at
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
>> > >> > > > at
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
>> > >> > > > at
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
>> > >> > > > at
>> > org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
>> > >> > > > at
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
>> > >> > > > at
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
>> > >> > > > at
>> > >> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
>> > >> > > > at
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
>> > >> > > >
>> > >> > > > I was initially getting a RAM is insufficient error
- and
>> > >> theoretically
>> > >> > > > (with no compression) realized that the pandas DataFrame
it
>> would
>> > >> try to
>> > >> > > > create would be ~8GB (21million records with each record
having
>> > ~400
>> > >> > > > bytes). I have increased my executor memory to be 20GB
per
>> > >> executor, but
>> > >> > > am
>> > >> > > > now getting this error from Arrow.
>> > >> > > > Looking for some pointers so I can understand this issue
>> better.
>> > >> > > >
>> > >> > > > Here's what I am trying. I have 2 tables with string
columns
>> where
>> > >> the
>> > >> > > > strings always have a fixed length:
>> > >> > > > *Table 1*:
>> > >> > > >     id: integer
>> > >> > > >    char_column1: string (length = 30)
>> > >> > > >    char_column2: string (length = 40)
>> > >> > > >    char_column3: string (length = 10)
>> > >> > > >    ...
>> > >> > > > In total, in table1, the char-columns have ~250 characters
>> > >> > > >
>> > >> > > > *Table 2*:
>> > >> > > >     id: integer
>> > >> > > >    char_column1: string (length = 50)
>> > >> > > >    char_column2: string (length = 3)
>> > >> > > >    char_column3: string (length = 4)
>> > >> > > >    ...
>> > >> > > > In total, in table2, the char-columns have ~150 characters
>> > >> > > >
>> > >> > > > I am joining these tables by ID. In my current dataset,
I have
>> > >> filtered
>> > >> > > my
>> > >> > > > data so only id=1 exists.
>> > >> > > > Table1 has ~400 records for id=1 and table2 has 50k
records for
>> > >> id=1.
>> > >> > > > Hence, total number of records (after joining) for
>> table1_join2 =
>> > >> 400 *
>> > >> > > 50k
>> > >> > > > = 20*10^6 records
>> > >> > > > Each row has ~400bytes (150+250) => overall memory
= 8*10^9
>> bytes
>> > >> => ~8GB
>> > >> > > >
>> > >> > > > Now, when I try an executor with 20GB RAM, it does not
work.
>> > >> > > > Is there some data duplicity happening internally ?
What
>> should be
>> > >> the
>> > >> > > > estimated RAM I need to give for this to work ?
>> > >> > > >
>> > >> > > > Thanks for reading,
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> >
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message