From dev-return-10931-archive-asf-public=cust-asf.ponee.io@arrow.apache.org Mon Mar 11 03:53:54 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 671E1180621 for ; Mon, 11 Mar 2019 04:53:53 +0100 (CET) Received: (qmail 53414 invoked by uid 500); 11 Mar 2019 03:53:51 -0000 Mailing-List: contact dev-help@arrow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@arrow.apache.org Delivered-To: mailing list dev@arrow.apache.org Received: (qmail 53389 invoked by uid 99); 11 Mar 2019 03:53:50 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Mar 2019 03:53:50 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id B9428C57CB for ; Mon, 11 Mar 2019 03:53:49 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.798 X-Spam-Level: * X-Spam-Status: No, score=1.798 tagged_above=-999 required=6.31 tests=[DKIMWL_WL_MED=-0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id Q3LPZXrZEd51 for ; Mon, 11 Mar 2019 03:53:47 +0000 (UTC) Received: from mail-wm1-f67.google.com (mail-wm1-f67.google.com [209.85.128.67]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id BCF326113C for ; Mon, 11 Mar 2019 03:43:07 +0000 (UTC) Received: by mail-wm1-f67.google.com with SMTP id o10so12931966wmc.1 for ; Sun, 10 Mar 2019 20:43:07 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=ew7slCJoqrNjRsFm+uPpOX5LOXgSpsBz5i26gwOj+4k=; b=acqH1sa2+fTTfGwWCB7MOM2K3JjUsNa9h8+JgshevGwfz01jXu23ozxefHt6qD9G5h vZ34rISDH6UGSUJRXM8DLX89c4TCs8C/C/+erULoCM54ulZROZd58/rVYJXchd4u+OBy 6PfeErjuUVlu/zS3I0cMpEDO5BT6Kwld3fj3S38bZrj4s2q+p08EqRVw6Bjo6nV7oK1h iAdmyk9Pr2aa7BOgyzIA/v+2yMeCupoo/uHuu/KBWGqQFgJs7dXqiFcQ2rsASRIyz/If +3IhRSVFn//I/hIxrPMPzjzs682Zq1M9miB1IUPyCYAe9K15JnO2qkvg7XgGMdpgLiWf oeXw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=ew7slCJoqrNjRsFm+uPpOX5LOXgSpsBz5i26gwOj+4k=; b=nF7ldtFl2oRDNkf/spvr75qeGFNTy2CggDgmkIxswekJO+ncNS57mJKJQRQZJ9Elav 6R5WD7mvRQlscfnaQa83Tr/x31yBkuTD2ifLb5MEeEhxKVS/p83k/XN2KGtpF8OO8WWl Ah2tKQicCmmb7snkA6JmHFo8vL82vPQhGcTCSKxALLBCNwQ85kl3niyuRHlhXq/lYOLM MzalabhRz5Y+AR+1lqONOlx5MtFJviHCaQx66AhECZTmFxSczJmcFuMrcIJssr375ZPL KLvzaTYCLS8JoqVuZFnZ1lzdWY25429avbDLMVwKHiFfIEoIjBnWZFkeaPw9KP8iBApk q+jg== X-Gm-Message-State: APjAAAXYTNgX8rhoEyCc2b5tqmhn+RCFMIn/aQ/N41G+oaf+NUpneXU7 dJhGhwfK47loyLBVUMvVSYkbsefrWXDszntHYawF X-Google-Smtp-Source: APXvYqwpF16zULd7cBXRmxBwAnY1aP55D5hIAxEP6tQynht9oR2ynJ4qpovb+JGNsz6iutK2BFmeW5WYvQChSpv1XQA= X-Received: by 2002:a05:600c:2143:: with SMTP id v3mr15491695wml.14.1552275779961; Sun, 10 Mar 2019 20:42:59 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Abdeali Kothari Date: Mon, 11 Mar 2019 09:12:46 +0530 Message-ID: Subject: Re: OversizedAllocationException for pandas_udf in pyspark To: dev@arrow.apache.org Content-Type: multipart/alternative; boundary="00000000000075487f0583c95eea" --00000000000075487f0583c95eea Content-Type: text/plain; charset="UTF-8" Hi, any help on this would be much appreciated. I've not been able to figure out any reason for this to happen yet On Sat, Mar 2, 2019, 11:50 Abdeali Kothari wrote: > Hi Li Jin, thanks for the note. > > I get this error only for larger data - when I reduce the number of > records or the number or columns in my data it all works fine - so if it is > binary incompatibility it should be something related to large data. > I am using Spark 2.3.1 on Amazon EMR for this testing. > https://github.com/apache/spark/blob/v2.3.1/pom.xml#L192 seems to > indicate arrow version is 0.8 for this. > > I installed pyarrow-0.8.0 in the python environment on my cluster with pip > and I am still getting this error. > The stacktrace is very similar, just some lines moved in the pxi files: > > Caused by: org.apache.spark.api.python.PythonException: Traceback (most > recent call last): > File > "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0018/container_1551469777576_0018_01_000002/pyspark.zip/pyspark/worker.py", > line 230, in main > process() > File > "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0018/container_1551469777576_0018_01_000002/pyspark.zip/pyspark/worker.py", > line 225, in process > serializer.dump_stream(func(split_index, iterator), outfile) > File > "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0018/container_1551469777576_0018_01_000002/pyspark.zip/pyspark/serializers.py", > line 260, in dump_stream > for series in iterator: > File > "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0018/container_1551469777576_0018_01_000002/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 268, in __iter__ > (/arrow/python/build/temp.linux-x86_64-3.6/lib.cxx:70278) > File "pyarrow/ipc.pxi", line 284, in > pyarrow.lib._RecordBatchReader.read_next_batch > (/arrow/python/build/temp.linux-x86_64-3.6/lib.cxx:70534) > File "pyarrow/error.pxi", line 79, in pyarrow.lib.check_status > (/arrow/python/build/temp.linux-x86_64-3.6/lib.cxx:8345) > pyarrow.lib.ArrowIOError: read length must be positive or -1 > > Other notes: > - My data is just integers, strings, and doubles. No complex types like > arrays/maps/etc. > - I don't have any NULL/None values in my data > - Increasing executor-memory for spark does not seem to help here > > As always: Any thoughts or notes would be great so I can get some pointers > in which direction to debug > > > > On Sat, Mar 2, 2019 at 2:24 AM Li Jin wrote: > >> The 2G limit that Uwe mentioned definitely exists, Spark serialize each >> group as a single RecordBatch currently. >> >> The "pyarrow.lib.ArrowIOError: read length must be positive or -1" is >> strange, I think Spark is on an older version of the Java side (0.10 for >> Spark 2.4 and 0.8 for Spark 2.3). I forgot whether there is binary >> incompatibility between these versions and pyarrow 0.12. >> >> On Fri, Mar 1, 2019 at 3:32 PM Abdeali Kothari >> wrote: >> >> > Forgot to mention: The above testing is with 0.11.1 >> > I tried 0.12.1 as you suggested - and am getting the >> > OversizedAllocationException with the 80char column. And getting read >> > length must be positive or -1 without that. So, both the issues are >> > reproducible with pyarrow 0.12.1 >> > >> > On Sat, Mar 2, 2019 at 1:57 AM Abdeali Kothari < >> abdealikothari@gmail.com> >> > wrote: >> > >> > > That was spot on! >> > > I had 3 columns with 80characters => 80*21*10^6 = 1.56 bytes >> > > I removed these columns and replaced each with 10 doubleType columns >> (so >> > > it would still be 80 bytes of data) - and this error didn't come up >> > anymore. >> > > I also removed all the other columns and just kept 1 column with >> > > 80characters - I got the error again. >> > > >> > > I'll make a simpler example and report it to spark - as I guess these >> > > columns would need some special handling. >> > > >> > > Now, when I run - I get a different error: >> > > 19/03/01 20:16:49 WARN TaskSetManager: Lost task 108.0 in stage 8.0 >> (TID >> > > 12, ip-172-31-10-249.us-west-2.compute.internal, executor 1): >> > > org.apache.spark.api.python.PythonException: Traceback (most recent >> call >> > > last): >> > > File >> > > >> > >> "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/worker.py", >> > > line 230, in main >> > > process() >> > > File >> > > >> > >> "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/worker.py", >> > > line 225, in process >> > > serializer.dump_stream(func(split_index, iterator), outfile) >> > > File >> > > >> > >> "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/serializers.py", >> > > line 260, in dump_stream >> > > for series in iterator: >> > > File >> > > >> > >> "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/serializers.py", >> > > line 279, in load_stream >> > > for batch in reader: >> > > File "pyarrow/ipc.pxi", line 265, in __iter__ >> > > File "pyarrow/ipc.pxi", line 281, in >> > > pyarrow.lib._RecordBatchReader.read_next_batch >> > > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status >> > > pyarrow.lib.ArrowIOError: read length must be positive or -1 >> > > >> > > Again, any pointers on what this means and what it indicates would be >> > > really useful for me. >> > > >> > > Thanks for the replies! >> > > >> > > >> > > On Fri, Mar 1, 2019 at 11:26 PM Uwe L. Korn wrote: >> > > >> > >> There is currently the limitation that a column in a single >> RecordBatch >> > >> can only hold 2G on the Java side. We work around this by splitting >> the >> > >> DataFrame under the hood into multiple RecordBatches. I'm not >> familiar >> > with >> > >> the Spark<->Arrow code but I guess that in this case, the Spark code >> can >> > >> only handle a single RecordBatch. >> > >> >> > >> Probably it is best to construct a >> https://stackoverflow.com/help/mcve >> > >> and create an issue with the Spark project. Most likely this is not a >> > bug >> > >> in Arrow but just requires a bit more complicated implementation >> around >> > the >> > >> Arrow libs. >> > >> >> > >> Still, please have a look at the exact size of your columns. We >> support >> > >> 2G per column, if it is only 1.5G, then there is probably a rounding >> > error >> > >> in the Arrow. Alternatively, you might also be in luck that the >> > following >> > >> patch >> > >> >> > >> https://github.com/apache/arrow/commit/bfe6865ba8087a46bd7665679e48af3a77987cef >> > >> which is part of Apache Arrow 0.12 already fixes your problem. >> > >> >> > >> Uwe >> > >> >> > >> On Fri, Mar 1, 2019, at 6:48 PM, Abdeali Kothari wrote: >> > >> > Is there a limitation that a single column cannot be more than >> 1-2G ? >> > >> > One of my columns definitely would be around 1.5GB of memory. >> > >> > >> > >> > I cannot split my DF into more partitions as I have only 1 ID and >> I'm >> > >> > grouping by that ID. >> > >> > So, the UDAF would only run on a single pandasDF >> > >> > I do have a requirement to make a very large DF for this UDAF (8GB >> as >> > i >> > >> > mentioned above) - trying to figure out what I need to do here to >> make >> > >> this >> > >> > work. >> > >> > Increasing RAM, etc. is no issue (i understand I'd need huge >> executors >> > >> as I >> > >> > have a huge data requirement). But trying to figure out how much to >> > >> > actually get - cause 20GB of RAM for the executor is also erroring >> out >> > >> > where I thought ~10GB would have been enough >> > >> > >> > >> > >> > >> > >> > >> > On Fri, Mar 1, 2019 at 10:25 PM Uwe L. Korn >> wrote: >> > >> > >> > >> > > Hello Abdeali, >> > >> > > >> > >> > > a problem could here be that a single column of your dataframe is >> > >> using >> > >> > > more than 2GB of RAM (possibly also just 1G). Try splitting your >> > >> DataFrame >> > >> > > into more partitions before applying the UDAF. >> > >> > > >> > >> > > Cheers >> > >> > > Uwe >> > >> > > >> > >> > > On Fri, Mar 1, 2019, at 9:09 AM, Abdeali Kothari wrote: >> > >> > > > I was using arrow with spark+python and when I'm trying some >> > >> pandas-UDAF >> > >> > > > functions I am getting this error: >> > >> > > > >> > >> > > > org.apache.arrow.vector.util.OversizedAllocationException: >> Unable >> > to >> > >> > > > expand >> > >> > > > the buffer >> > >> > > > at >> > >> > > > >> > >> > > >> > >> >> > >> org.apache.arrow.vector.BaseVariableWidthVector.reallocDataBuffer(BaseVariableWidthVector.java:457) >> > >> > > > at >> > >> > > > >> > >> > > >> > >> >> > >> org.apache.arrow.vector.BaseVariableWidthVector.handleSafe(BaseVariableWidthVector.java:1188) >> > >> > > > at >> > >> > > > >> > >> > > >> > >> >> > >> org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1026) >> > >> > > > at >> > >> > > > >> > >> > > >> > >> >> > >> org.apache.spark.sql.execution.arrow.StringWriter.setValue(ArrowWriter.scala:256) >> > >> > > > at >> > >> > > > >> > >> > > >> > >> >> > >> org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122) >> > >> > > > at >> > >> > > > >> > >> > > >> > >> >> > >> org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87) >> > >> > > > at >> > >> > > > >> > >> > > >> > >> >> > >> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84) >> > >> > > > at >> > >> > > > >> > >> > > >> > >> >> > >> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75) >> > >> > > > at >> > >> > > > >> > >> > > >> > >> >> > >> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75) >> > >> > > > at >> > org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380) >> > >> > > > at >> > >> > > > >> > >> > > >> > >> >> > >> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95) >> > >> > > > at >> > >> > > > >> > >> > > >> > >> >> > >> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215) >> > >> > > > at >> > >> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) >> > >> > > > at >> > >> > > > >> > >> > > >> > >> >> > >> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170) >> > >> > > > >> > >> > > > I was initially getting a RAM is insufficient error - and >> > >> theoretically >> > >> > > > (with no compression) realized that the pandas DataFrame it >> would >> > >> try to >> > >> > > > create would be ~8GB (21million records with each record having >> > ~400 >> > >> > > > bytes). I have increased my executor memory to be 20GB per >> > >> executor, but >> > >> > > am >> > >> > > > now getting this error from Arrow. >> > >> > > > Looking for some pointers so I can understand this issue >> better. >> > >> > > > >> > >> > > > Here's what I am trying. I have 2 tables with string columns >> where >> > >> the >> > >> > > > strings always have a fixed length: >> > >> > > > *Table 1*: >> > >> > > > id: integer >> > >> > > > char_column1: string (length = 30) >> > >> > > > char_column2: string (length = 40) >> > >> > > > char_column3: string (length = 10) >> > >> > > > ... >> > >> > > > In total, in table1, the char-columns have ~250 characters >> > >> > > > >> > >> > > > *Table 2*: >> > >> > > > id: integer >> > >> > > > char_column1: string (length = 50) >> > >> > > > char_column2: string (length = 3) >> > >> > > > char_column3: string (length = 4) >> > >> > > > ... >> > >> > > > In total, in table2, the char-columns have ~150 characters >> > >> > > > >> > >> > > > I am joining these tables by ID. In my current dataset, I have >> > >> filtered >> > >> > > my >> > >> > > > data so only id=1 exists. >> > >> > > > Table1 has ~400 records for id=1 and table2 has 50k records for >> > >> id=1. >> > >> > > > Hence, total number of records (after joining) for >> table1_join2 = >> > >> 400 * >> > >> > > 50k >> > >> > > > = 20*10^6 records >> > >> > > > Each row has ~400bytes (150+250) => overall memory = 8*10^9 >> bytes >> > >> => ~8GB >> > >> > > > >> > >> > > > Now, when I try an executor with 20GB RAM, it does not work. >> > >> > > > Is there some data duplicity happening internally ? What >> should be >> > >> the >> > >> > > > estimated RAM I need to give for this to work ? >> > >> > > > >> > >> > > > Thanks for reading, >> > >> > > > >> > >> > > >> > >> > >> > >> >> > > >> > >> > --00000000000075487f0583c95eea--