Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6FD69200C4E for ; Fri, 21 Apr 2017 09:36:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6E567160BA2; Fri, 21 Apr 2017 07:36:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6D0F3160B97 for ; Fri, 21 Apr 2017 09:36:33 +0200 (CEST) Received: (qmail 85931 invoked by uid 500); 21 Apr 2017 07:36:32 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 85921 invoked by uid 99); 21 Apr 2017 07:36:32 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Apr 2017 07:36:32 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 18D461813B8 for ; Fri, 21 Apr 2017 07:36:32 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.867 X-Spam-Level: X-Spam-Status: No, score=0.867 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, HTML_OBFUSCATE_10_20=1.162, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-2.796, RCVD_IN_SORBS_SPAM=0.5, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=okkam-it.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id ch3pDcvYIgb9 for ; Fri, 21 Apr 2017 07:36:28 +0000 (UTC) Received: from mail-ua0-f175.google.com (mail-ua0-f175.google.com [209.85.217.175]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 6AB875F342 for ; Fri, 21 Apr 2017 07:36:28 +0000 (UTC) Received: by mail-ua0-f175.google.com with SMTP id h2so75695394uaa.1 for ; Fri, 21 Apr 2017 00:36:28 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=okkam-it.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=0RYPf8x0+Lcges87307bkVNC1hHwA9RFC6/VpP7E7lQ=; b=caYQqiMUOrmBYYTmQ+pbVilgtpf/LvRhQ8M+sOPE66dM3FSf45PUUmBAnA1XnWiR6A N/SneYzz/CJ/JvUQXGHCS2fEN/2Cxu9wa38zJyXhnld+bjXGuJIG1tas4zy2C813TxsU pFeV/ZE47TTcCNtwcs2McOvb4Ua8LhjdGvRX/XxmDyUQwY1mzHqna1el0TOOD50e6AJ9 D2rcXC4mFPUdYC/LjXCLnzoSkEKrqJo66wL7aQshSfY4OOoROyqoAHPoPzsV5qgznsm5 PlyOErOQUX7ipXxr1h4i9L9mjGVEPIuIrkvtr4kqwWQL1tNxoLMijP/BB+nINIUKkFeQ zwJQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=0RYPf8x0+Lcges87307bkVNC1hHwA9RFC6/VpP7E7lQ=; b=YgzCbbl48EZ3iWkHuButRt1MaoL0XF8t6W3JnY6PnbkuShACrcFFGKpC0YN/UDWq6O RTEGOvJhrOsEF2HLeeqfXrGm9lwXcsWG9KWak6eOXNXkYnyAWv69f13vUZEdVUgywxTx YpaKDGeQJpMOVOYl1DMxTm6gD6H5fA2iNkuWV53m6tAAGSt5bCzpz3keZHAozUp05YE9 FQZNPyaLMzGp/99Qq9BLDV8jy/ZO/9rBLBaQJyMFFyoyfB8F65JQfQIzdZC9A9W1piI6 iZf6zv/Z0nXaCSysCSBwbTYxlSJ/AsDLk+JQPJWF+KQi8rTiyjVw2TVgmk6k40ysmksM IVBw== X-Gm-Message-State: AN3rC/7drIAtxFk+PrcDRznPmSN+XF8WJwwewcn64zmtXaYTcGBJAzZR UPmRGPIniBoqW4jhkguhEquxOXYRLA== X-Received: by 10.31.180.81 with SMTP id d78mr5032312vkf.44.1492760182059; Fri, 21 Apr 2017 00:36:22 -0700 (PDT) MIME-Version: 1.0 Received: by 10.31.49.83 with HTTP; Fri, 21 Apr 2017 00:36:01 -0700 (PDT) X-Originating-IP: [77.43.114.114] In-Reply-To: References: From: Flavio Pompermaier Date: Fri, 21 Apr 2017 09:36:01 +0200 Message-ID: Subject: Re: UnilateralSortMerger error (again) To: Ted Yu Cc: user Content-Type: multipart/alternative; boundary=001a113e00cc630e14054da850f6 archived-at: Fri, 21 Apr 2017 07:36:34 -0000 --001a113e00cc630e14054da850f6 Content-Type: text/plain; charset=UTF-8 As suggested by Fabian I set taskmanager.memory.size = 1024 (to force spilling to disk) and the job failed almost immediately.. On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier wrote: > I debugged a bit the process repeating the job on a sub-slice of the > entire data (using the id value to filter data with parquet push down > filters) and all slices completed successfully :( > So I tried to increase the parallelism (from 1 slot per TM to 4) to see if > this was somehow a factor of stress but it didn't cause any error. > Then I almost doubled the number of rows to process and finally the error > showed up again. > It seems somehow related to spilling to disk but I can't really understand > what's going on :( > This is a summary of my debug attempts: > > 4 Task managers with 6 GB and 1 slot each, parallelism = 4 > > id < 10.000.000.000 => 1.857.365 rows => OK > id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK > id >= 10.010.000.000 && id < 99.945.000.000 => 20.926.903 rows => OK > id >= 99.945.000.000 && id < 99.960.000.000 => 23.888.750 rows => > OK > id >= 99.960.000.000 => 32.936.422 rows => OK > > 4 TM with 8 GB and 4 slot each, parallelism 16 > > id >= 99.960.000.000 => 32.936.422 rows => OK > id >= 99.945.000.000 => 56.825.172 rows => ERROR > > Any help is appreciated.. > Best, > Flavio > > On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier > wrote: > >> I could but only if there's a good probability that it fix the >> problem...how confident are you about it? >> >> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu wrote: >> >>> Looking at git log of DataInputDeserializer.java , there has been some >>> recent change. >>> >>> If you have time, maybe try with 1.2.1 RC and see if the error is >>> reproducible ? >>> >>> Cheers >>> >>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier < >>> pompermaier@okkam.it> wrote: >>> >>>> Hi to all, >>>> I think I'm again on the weird Exception with the >>>> SpillingAdaptiveSpanningRecordDeserializer... >>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to >>>> disk but the Exception thrown is not very helpful. Any idea? >>>> >>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: >>>> null >>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>> .getIterator(UnilateralSortMerger.java:619) >>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>> ask.java:1094) >>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >>>> (GroupReduceDriver.java:99) >>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) >>>> ... 3 more >>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>> terminated due to an exception: null >>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>> $ThreadBase.run(UnilateralSortMerger.java:799) >>>> Caused by: java.io.EOFException >>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi >>>> gnedByte(DataInputDeserializer.java:306) >>>> at org.apache.flink.types.StringValue.readString(StringValue.java:747) >>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>> deserialize(StringSerializer.java:69) >>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>> deserialize(StringSerializer.java:74) >>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>> deserialize(StringSerializer.java:28) >>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>> serialize(RowSerializer.java:193) >>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>> serialize(RowSerializer.java:36) >>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >>>> gate.read(ReusingDeserializationDelegate.java:57) >>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>> daptiveSpanningRecordDeserializer.java:144) >>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>> dReader.getNextRecord(AbstractRecordReader.java:72) >>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>> Reader.next(MutableRecordReader.java:42) >>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>> ReaderIterator.java:59) >>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>> $ReadingThread.go(UnilateralSortMerger.java:1035) >>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>> $ThreadBase.run(UnilateralSortMerger.java:796) >>>> >>>> >>>> Best, >>>> Flavio >>>> >>> >>> >> >> > --001a113e00cc630e14054da850f6 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
As suggested by Fabian I set=C2=A0taskmanager.memory.size = =3D 1024 (to force spilling to disk) and the job failed almost immediately.= .

On Fri, Apr 21, = 2017 at 12:33 AM, Flavio Pompermaier <pompermaier@okkam.it> wrote:
I debugged a b= it the process repeating the job on a sub-slice of the entire data (using t= he id value to filter data with parquet push down filters) and all slices c= ompleted successfully :(
So I tried to increase the parallelism (from 1= slot per TM to 4) to see if this was somehow a factor of stress but it did= n't cause any error.
Then I almost doubled the number of rows= to process and finally the error showed up again.
It seems someh= ow related to spilling to disk but I can't really understand what's= going on :(
This is a summary of my debug attempts:
4 Task managers with 6 GB =C2=A0and 1 slot each, parallel= ism =3D 4

id < 10.000.000.000=C2=A0 =3D> 1.857.365 = rows =3D> OK
id &g= t;=3D 10.000.000.000 && id < 10.010.000.000 =3D> 20.057.714 rows =3D> OK
id >=3D 10.010.000.000 && id < 99.945.000.000 = =C2=A0 =C2=A0 =C2=A0 =3D> 20.926.903 rows =3D> OK
id >=3D 99.945.000.000 && id = < 99.960.000.000 =C2=A0 =C2=A0 =C2=A0 =3D> 23.888.750 =C2=A0rows =3D&= gt; OK
id >=3D 99.= 960.000.000 = =3D> 32.936.422 rows =3D> OK

4 T= M with 8 GB and 4 slot each, parallelism 16

id= >=3D 99.960.000.000 = =3D> 32.936.422 rows =3D> OK
id >=3D 99.945.000.000=C2=A0 =3D>= ;=C2=A056.825.172 rows =3D> ERROR

Any hel= p is appreciated..
Best,
Flavio

On Wed, Apr = 19, 2017 at 8:34 PM, Flavio Pompermaier <pompermaier@okkam.it> wrote:
I could but= only if there's a good probability that it fix the problem...how confi= dent are you about it?

On Wed, Apr 19, 2017 at 8:= 27 PM, Ted Yu <yuzhihong@gmail.com> wrote:
Looking at git log of=C2=A0DataInputDes= erializer.java , there has been some recent change.

If y= ou have time, maybe try with 1.2.1 RC and see if the error is reproducible = ?

Cheers

On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier &l= t;pompermaier@okk= am.it> wrote:
Hi to all,
I think I'm again on the weird Exception with the Sp= illingAdaptiveSpanningRecordDeserializer...
I'm using Fl= ink 1.2.0 and the error seems to rise when Flink spills to disk but the Exc= eption thrown is not very helpful. Any idea?

Cause= d by: java.lang.RuntimeException: Error obtaining the sorted input: Thread = 'SortMerger Reading Thread' terminated due to an exception: null at org.apache.flink.runtime.operators.sort.UnilateralSortMerger= .getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.ru= ntime.operators.BatchTask.getInput(BatchTask.java:1094)
at or= g.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupR= educeDriver.java:99)
at org.apache.flink.runtime.operators.BatchTa= sk.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOExce= ption: Thread 'SortMerger Reading Thread' terminated due to an exce= ption: null
at org.apache.flink.runtime.operators.sort.UnilateralS= ortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused= by: java.io.EOFException
at org.apache.flink.runtime.util.DataInp= utDeserializer.readUnsignedByte(DataInputDeserializer.java:306) at org.apache.flink.types.StringValue.readString(StringValue.java:747)
at org.apache.flink.api.common.typeutils.base.StringSeria= lizer.deserialize(StringSerializer.java:69)
at org.apache.fli= nk.api.common.typeutils.base.StringSerializer.deserialize(StringS= erializer.java:74)
at org.apache.flink.api.common.typeutils.b= ase.StringSerializer.deserialize(StringSerializer.java:28)
at= org.apache.flink.api.java.typeutils.runtime.RowSerializer.deseriali= ze(RowSerializer.java:193)
at org.apache.flink.api.java.typeutils.ru= ntime.RowSerializer.deserialize(RowSerializer.java:36)
at= org.apache.flink.runtime.plugable.ReusingDeserializationDelegate= .read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io= .network.api.serialization.SpillingAdaptiveSpanningRecordDese= rializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144)
at org.apache.flink.runtime.io.network.api.reader.Abstra= ctRecordReader.getNextRecord(AbstractRecordReader.java:72)
at= org.apach= e.flink.runtime.io.network.api.reader.MutableRecordReader.nex= t(MutableRecordReader.java:42)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.ap= ache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThr= ead.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime= .operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSor= tMerger.java:796)


Best,
Flavio






--001a113e00cc630e14054da850f6--