Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 45B3A200C5B for ; Thu, 27 Apr 2017 17:31:27 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 444FE160BB2; Thu, 27 Apr 2017 15:31:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 77D5A160B98 for ; Thu, 27 Apr 2017 17:31:25 +0200 (CEST) Received: (qmail 34856 invoked by uid 500); 27 Apr 2017 15:31:19 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 34846 invoked by uid 99); 27 Apr 2017 15:31:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Apr 2017 15:31:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id F0F161889D3 for ; Thu, 27 Apr 2017 15:31:18 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.931 X-Spam-Level: * X-Spam-Status: No, score=1.931 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, HTML_OBFUSCATE_05_10=0.001, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id SvEMCQgbtU3J for ; Thu, 27 Apr 2017 15:31:13 +0000 (UTC) Received: from mail-it0-f50.google.com (mail-it0-f50.google.com [209.85.214.50]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 667CA5FC57 for ; Thu, 27 Apr 2017 15:31:12 +0000 (UTC) Received: by mail-it0-f50.google.com with SMTP id g66so14243636ite.1 for ; Thu, 27 Apr 2017 08:31:12 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=q9dMFgMISOcGmM8ZnjPvag36rvyaW1v/Eke3igiwDOA=; b=BzufeoC9P/E6vGsFjVFdy7hMzVlgurRxfBD5uRu9lB7Ns5qnIGTL9WEQQ0ZAUOsh8c raa1iMusV/THxlqSadN6PoWUf1NZcDZ5kcGwkcJtIP+RIBdN0oSIbGAD7oBB2y7rUK9X Qaa6Eexklp6cuQPL1ZDqxsD92aJSW0QnhVD7+l+YYcxVjRouzb4/wyyYUUQptkSDjhgc XPjobxthNGL88crODdO0WJjmma//xVs7TeMLRuBiqqQwl4/xJ+aoGE0Y0iqNpfu3Ry1l 0QOhNxt3nugAhk/SDUSMlnLhQLkdlbqjuv2a9HiihVkeNx1n/IJCVTdAvVN2SulnBf7B pRKQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=q9dMFgMISOcGmM8ZnjPvag36rvyaW1v/Eke3igiwDOA=; b=bDVfPgZIxVJtvmnpHbd/YkoHMFVmYTIAe5HAGuvOcFdEs59JzfOT2+JJ9j/dzQibkY dvgdWsNBzzqWWLUV6xT11a53h0Vm4QytJ04HpGjiMQGuB/dWGtz1luBylLOzrUcuo3IT 0fjIO9qJIvRo7wlU1BfbSG/Xdk6C2N8mb70ciq19jvQ2p7HvXJmaE575crDnj2BKJ0ZS 6EM+AzJVxua98osnxLlID+XL0wVNyErbe+mPYKfpBbUvd006YgNKbmLhkgd5PAGkp9Or MkvtZw3OebTp95/vsx3dffyfRNqUznsKf9ip/m0H7XvdRpU2V6kEDKLZenvFgQiOWj7A aWfg== X-Gm-Message-State: AN3rC/5w6nbcW4Zs0WD3BLN7eiuxQhzH5Keh+exuzOwQZipGPLRbGMLv jIJ7jxRmB9SpGMOdBqT9ZXQaYsiLBvLJ X-Received: by 10.36.7.3 with SMTP id f3mr3750106itf.27.1493307071057; Thu, 27 Apr 2017 08:31:11 -0700 (PDT) MIME-Version: 1.0 Received: by 10.36.116.20 with HTTP; Thu, 27 Apr 2017 08:31:10 -0700 (PDT) In-Reply-To: References: <633F19C6003CFF44891E456BB8269E55014ECAA3@lhreml505-mbs.china.huawei.com> From: Kurt Young Date: Thu, 27 Apr 2017 23:31:10 +0800 Message-ID: Subject: Re: UnilateralSortMerger error (again) To: Flavio Pompermaier Cc: user Content-Type: multipart/alternative; boundary=001a11440f2282feae054e27a5c3 archived-at: Thu, 27 Apr 2017 15:31:27 -0000 --001a11440f2282feae054e27a5c3 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi, i have found the bug: https://issues.apache.org/jira/browse/FLINK-6398, will open a PR soon. Best, Kurt On Thu, Apr 27, 2017 at 8:23 PM, Flavio Pompermaier wrote: > Thanks a lot Kurt! > > On Thu, Apr 27, 2017 at 2:12 PM, Kurt Young wrote: > >> Thanks for the test case, i will take a look at it. >> >> Flavio Pompermaier =E4=BA=8E2017=E5=B9=B44=E6=9C= =8827=E6=97=A5 =E5=91=A8=E5=9B=9B03:55=E5=86=99=E9=81=93=EF=BC=9A >> >>> I've created a repository with a unit test to reproduce the error at >>> https://github.com/fpompermaier/flink-batch-bug/blob/ >>> master/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java = (probably >>> this error is related also to FLINK-4719). >>> >>> The exception is thrown only when there are null strings and multiple >>> slots per TM, I don't know whether UnilateralSorterMerger is involved o= r >>> not (but I think so..). >>> A quick fix for this problem would be very appreciated because it's >>> bloking a production deployment.. >>> >>> Thanks in advance to all, >>> Flavio >>> >>> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier < >>> pompermaier@okkam.it> wrote: >>> >>>> After digging into the code and test I think that the problem is almos= t >>>> certainly in the UnilateralSortMerger, there should be a missing >>>> synchronization on some shared object somewhere...Right now I'm trying= to >>>> understand if this section of code creates some shared object (like qu= eues) >>>> that are accessed in a bad way when there's spilling to disk: >>>> >>>> // start the thread that reads the input channels >>>> this.readThread =3D getReadingThread(exceptionHandler, input, >>>> circularQueues, largeRecordHandler, >>>> parentTask, serializer, ((long) (startSpillingFraction * sortMemory)))= ; >>>> >>>> // start the thread that sorts the buffers >>>> this.sortThread =3D getSortingThread(exceptionHandler, circularQueues, >>>> parentTask); >>>> >>>> // start the thread that handles spilling to secondary storage >>>> this.spillThread =3D getSpillingThread(exceptionHandler, circularQueue= s, >>>> parentTask, >>>> memoryManager, ioManager, serializerFactory, comparator, >>>> this.sortReadMemory, this.writeMemory, >>>> maxNumFileHandles); >>>> .... >>>> startThreads(); >>>> >>>> >>>> The problem is that the unit tests of GroupReduceDriver use Record and >>>> testing Rows in not very straightforward and I'm still trying to repro= duce >>>> the problem in a local env.. >>>> >>>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier < >>>> pompermaier@okkam.it> wrote: >>>> >>>>> Thanks for the explanation . Is there a way to force this behaviour i= n >>>>> a local environment (to try to debug the problem)? >>>>> >>>>> On 21 Apr 2017 21:49, "Fabian Hueske" wrote: >>>>> >>>>>> Hi Flavio, >>>>>> >>>>>> these files are used for spilling data to disk. In your case sorted >>>>>> runs of records. >>>>>> Later all (up to a fanout threshold) these sorted runs are read and >>>>>> merged to get a completely sorted record stream. >>>>>> >>>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier = : >>>>>> >>>>>>> The error appears as soon as some taskmanager generates some >>>>>>> inputchannel file. >>>>>>> What are those files used for? >>>>>>> >>>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier < >>>>>>> pompermaier@okkam.it> wrote: >>>>>>> >>>>>>>> In another run of the job I had another Exception. Could it be >>>>>>>> helpful? >>>>>>>> >>>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading >>>>>>>> Thread' terminated due to an exception: Serializer consumed more b= ytes than >>>>>>>> the record had. This indicates broken serialization. If you are us= ing >>>>>>>> custom serialization types (Value or Writable), check their serial= ization >>>>>>>> methods. If you are using a Kryo-serialized type, check the corres= ponding >>>>>>>> Kryo serializer. >>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask. >>>>>>>> java:465) >>>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >>>>>>>> k.java:355) >>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) >>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exc= eption: >>>>>>>> Serializer consumed more bytes than the record had. This indicates= broken >>>>>>>> serialization. If you are using custom serialization types (Value = or >>>>>>>> Writable), check their serialization methods. If you are using a >>>>>>>> Kryo-serialized type, check the corresponding Kryo serializer. >>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>> .getIterator(UnilateralSortMerger.java:619) >>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>>>>>> ask.java:1094) >>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >>>>>>>> (GroupReduceDriver.java:99) >>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask. >>>>>>>> java:460) >>>>>>>> ... 3 more >>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>>>>>> terminated due to an exception: Serializer consumed more bytes tha= n the >>>>>>>> record had. This indicates broken serialization. If you are using = custom >>>>>>>> serialization types (Value or Writable), check their serialization= methods. >>>>>>>> If you are using a Kryo-serialized type, check the corresponding K= ryo >>>>>>>> serializer. >>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799) >>>>>>>> Caused by: java.io.IOException: Serializer consumed more bytes tha= n >>>>>>>> the record had. This indicates broken serialization. If you are us= ing >>>>>>>> custom serialization types (Value or Writable), check their serial= ization >>>>>>>> methods. If you are using a Kryo-serialized type, check the corres= ponding >>>>>>>> Kryo serializer. >>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>>>>> daptiveSpanningRecordDeserializer.java:123) >>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72) >>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>>>>>> Reader.next(MutableRecordReader.java:42) >>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>>>>>> ReaderIterator.java:59) >>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 >>>>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor >>>>>>>> ySegment.java:104) >>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper. >>>>>>>> readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226) >>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read >>>>>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231) >>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue. >>>>>>>> java:770) >>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>> deserialize(StringSerializer.java:69) >>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>> deserialize(StringSerializer.java:74) >>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>> deserialize(StringSerializer.java:28) >>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>> serialize(RowSerializer.java:193) >>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>> serialize(RowSerializer.java:36) >>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >>>>>>>> gate.read(ReusingDeserializationDelegate.java:57) >>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>>>>> daptiveSpanningRecordDeserializer.java:109) >>>>>>>> ... 5 more >>>>>>>> >>>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli < >>>>>>>> stefano.bortoli@huawei.com> wrote: >>>>>>>> >>>>>>>>> In fact the old problem was with the KryoSerializer missed >>>>>>>>> initialization on the exception that would trigger the spilling o= n disk. >>>>>>>>> This would lead to dirty serialization buffer that would eventual= ly break >>>>>>>>> the program. Till worked on it debugging the source code generati= ng the >>>>>>>>> error. Perhaps someone could try the same also this time. If Flav= io can >>>>>>>>> make the problem reproducible in a shareable program+data. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Stefano >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> *From:* Stephan Ewen [mailto:sewen@apache.org] >>>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM >>>>>>>>> *To:* user >>>>>>>>> *Subject:* Re: UnilateralSortMerger error (again) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> In the past, these errors were most often caused by bugs in the >>>>>>>>> serializers, not in the sorter. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> What types are you using at that point? The Stack Trace reveals >>>>>>>>> ROW and StringValue, any other involved types? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier < >>>>>>>>> pompermaier@okkam.it> wrote: >>>>>>>>> >>>>>>>>> As suggested by Fabian I set taskmanager.memory.size =3D 1024 (to >>>>>>>>> force spilling to disk) and the job failed almost immediately.. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier < >>>>>>>>> pompermaier@okkam.it> wrote: >>>>>>>>> >>>>>>>>> I debugged a bit the process repeating the job on a sub-slice of >>>>>>>>> the entire data (using the id value to filter data with parquet p= ush down >>>>>>>>> filters) and all slices completed successfully :( >>>>>>>>> >>>>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) >>>>>>>>> to see if this was somehow a factor of stress but it didn't cause= any error. >>>>>>>>> >>>>>>>>> Then I almost doubled the number of rows to process and finally >>>>>>>>> the error showed up again. >>>>>>>>> >>>>>>>>> It seems somehow related to spilling to disk but I can't really >>>>>>>>> understand what's going on :( >>>>>>>>> >>>>>>>>> This is a summary of my debug attempts: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> 4 Task managers with 6 GB and 1 slot each, parallelism =3D 4 >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> id < 10.000.000.000 =3D> 1.857.365 rows =3D> OK >>>>>>>>> >>>>>>>>> id >=3D 10.000.000.000 && id < 10.010.000.000 =3D> 20.057.714 row= s =3D> >>>>>>>>> OK >>>>>>>>> >>>>>>>>> id >=3D 10.010.000.000 && id < 99.945.000.000 =3D> 20.926.9= 03 >>>>>>>>> rows =3D> OK >>>>>>>>> >>>>>>>>> id >=3D 99.945.000.000 && id < 99.960.000.000 =3D> 23.888.7= 50 >>>>>>>>> rows =3D> OK >>>>>>>>> >>>>>>>>> id >=3D 99.960.000.000 =3D> 32.936.422 rows =3D> OK >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16 >>>>>>>>> >>>>>>>>> >>>>>>>>> id >=3D 99.960.000.000 =3D> 32.936.422 rows =3D> OK >>>>>>>>> >>>>>>>>> id >=3D 99.945.000.000 =3D> 56.825.172 rows =3D> ERROR >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Any help is appreciated.. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> >>>>>>>>> Flavio >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier < >>>>>>>>> pompermaier@okkam.it> wrote: >>>>>>>>> >>>>>>>>> I could but only if there's a good probability that it fix the >>>>>>>>> problem...how confident are you about it? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Looking at git log of DataInputDeserializer.java , there has been >>>>>>>>> some recent change. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is >>>>>>>>> reproducible ? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Cheers >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier < >>>>>>>>> pompermaier@okkam.it> wrote: >>>>>>>>> >>>>>>>>> Hi to all, >>>>>>>>> >>>>>>>>> I think I'm again on the weird Exception with the >>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer... >>>>>>>>> >>>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink >>>>>>>>> spills to disk but the Exception thrown is not very helpful. Any = idea? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an ex= ception: >>>>>>>>> null >>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>> .getIterator(UnilateralSortMerger.java:619) >>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>>>>>>> ask.java:1094) >>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >>>>>>>>> (GroupReduceDriver.java:99) >>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask. >>>>>>>>> java:460) >>>>>>>>> ... 3 more >>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread= ' >>>>>>>>> terminated due to an exception: null >>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799) >>>>>>>>> Caused by: java.io.EOFException >>>>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi >>>>>>>>> gnedByte(DataInputDeserializer.java:306) >>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue. >>>>>>>>> java:747) >>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>> deserialize(StringSerializer.java:69) >>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>> deserialize(StringSerializer.java:74) >>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>> deserialize(StringSerializer.java:28) >>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>>> serialize(RowSerializer.java:193) >>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>>> serialize(RowSerializer.java:36) >>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57) >>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>>>>>> daptiveSpanningRecordDeserializer.java:144) >>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72) >>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>>>>>>> Reader.next(MutableRecordReader.java:42) >>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>>>>>>> ReaderIterator.java:59) >>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>>>> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> >>>>>>>>> Flavio >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>>> >>> -- >> Best, >> Kurt >> > > --001a11440f2282feae054e27a5c3 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,= i have found the bug:=C2=A0https://issues.apache.org/jira/browse/FLINK-6398, will op= en a PR soon.

Best,
Kurt

On Thu, Apr 27, 2017 at 8:23 PM, Flavio Pomp= ermaier <pompermaier@okkam.it> wrote:
Thanks a lot Kurt!
<= div class=3D"gmail-h5">

On Thu, Apr 27, 2017 at 2:12 PM, Kurt Young <ykt836@gmail.com>= wrote:
Th= anks for the test case, i will take a look at it.

Flavio Pompermaier <pompermaier@okkam.it>=E4=BA=8E2017=E5=B9= =B44=E6=9C=8827=E6=97=A5 =E5=91=A8=E5=9B=9B03:55=E5=86=99=E9=81=93=EF= =BC=9A
I've created a repository = with a unit test to reproduce the error at=C2=A0https://github.com/fpom= permaier/flink-batch-bug/blob/master/src/test/java/it/okkam/= flink/aci/TestDataInputDeserializer.java=C2=A0(probably this error= is related also to FLINK-4719).

The exception is =C2=A0= thrown only when there are null strings and multiple slots per TM, I don= 9;t know whether UnilateralSorterMerger is involved or not (but I think so.= .).=C2=A0
A quick fix for this problem would be very appreciated = because it's bloking a production deployment..

Thanks in advance to all,
Flavio

On Wed, Apr 26, 2017 at 4:42 PM,= Flavio Pompermaier <pompermaier@okkam.it> wrote:
After digging into the code and test= I think that the problem is almost certainly in the=C2=A0UnilateralSortMer= ger, there should be a missing synchronization on some shared object somewh= ere...Right now I'm trying to understand if this section of code create= s some shared object (like queues) that are accessed in a bad way when ther= e's spilling to disk:

=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0// start the thread that reads the input cha= nnels
this.readThread =3D getReadingThread(exc= eptionHandler, input, circularQueues, largeRecordHandler,
parentTask, serializer, ((long) (startSpillingFraction * = sortMemory)));

// start the thre= ad that sorts the buffers
this.sortThread =3D g= etSortingThread(exceptionHandler, circularQueues, parentTask);

// start the thread that handles spilli= ng to secondary storage
this.spillThread =3D ge= tSpillingThread(exceptionHandler, circularQueues, parentTask,=C2=A0
memoryManager, ioManager, serializerFactory, co= mparator, this.sortReadMemory, this.writeMemory,=C2=A0
= maxNumFileHandles);
....
startThreads();


The problem is that the unit tests of GroupReduceDriver use Record and te= sting Rows in not very straightforward and I'm still trying to reproduc= e the problem in a local env..

On Fri, Apr 21, 2017 at 9:53 PM, Flavi= o Pompermaier <pompermaier@okkam.it> wrote:
Thanks for the explanation . Is there a wa= y to force this behaviour in a local environment (to try to debug the probl= em)?

On 21 Apr 2017 21:49, "= ;Fabian Hueske" <fhueske@gmail.com> wrote:
Hi Flavio,

=
these files are used for spilling data to disk. In your case sorted r= uns of records.
Later all (up to a fanout threshold) these sorted= runs are read and merged to get a completely sorted record stream.

2017-04-2= 1 14:09 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
The error appears as soon= as some taskmanager generates some inputchannel file.
What are those f= iles used for?

On Fri, Apr 21, 2017 at 11:53 AM, Flav= io Pompermaier <pompermaier@okkam.it> wrote:
In another run of the job I had another E= xception. Could it be helpful?

Error obtaining= the sorted input: Thread 'SortMerger Reading Thread' terminated du= e to an exception: Serializer consumed more bytes than the record had. This= indicates broken serialization. If you are using custom serialization type= s (Value or Writable), check their serialization methods. If you are using = a Kryo-serialized type, check the corresponding Kryo serializer.
= at org.apache.flink.r= untime.operators.BatchTask.run(BatchTask.java:465)
at org.apache.flink.runti= me.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtim= e.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:= 745)
Caused by: java.lang.RuntimeException: Error obtaining the s= orted input: Thread 'SortMerger Reading Thread' terminated due to a= n exception: Serializer consumed more bytes than the record had. This indic= ates broken serialization. If you are using custom serialization types (Val= ue or Writable), check their serialization methods. If you are using a Kryo= -serialized type, check the corresponding Kryo serializer.
= at org.apache.flink.r= untime.operators.sort.UnilateralSortMerger.getIterator(Unilateral= SortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInpu= t(BatchTask.java:1094)
at org.apache.flink.runtime.operators.GroupReduceDriv= er.prepare(GroupReduceDriver.java:99)
at org.apache.flink.runtime.operator= s.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.i= o.IOException: Thread 'SortMerger Reading Thread' terminated due to= an exception: Serializer consumed more bytes than the record had. This ind= icates broken serialization. If you are using custom serialization types (V= alue or Writable), check their serialization methods. If you are using a Kr= yo-serialized type, check the corresponding Kryo serializer.
at org.apache.flink.runti= me.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralS= ortMerger.java:799)
Caused by: java.io.IOException: Serializ= er consumed more bytes than the record had. This indicates broken serializa= tion. If you are using custom serialization types (Value or Writable), chec= k their serialization methods. If you are using a Kryo-serialized type, che= ck the corresponding Kryo serializer.
at org.apache.flink.runtime.io.network.api.serial= ization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(= SpillingAdaptiveSpanningRecordDeserializer.java:123)
<= div> at org.apache.flink.runtime= .io.network.api.reader.AbstractRecordReader.getNextRecord(Abs= tractRecordReader.java:72)
at org.apache.flink.runtime.io.network.api.reader.Mutab= leRecordReader.next(MutableRecordReader.java:42)
at org.apache.flink.runtime= .operators.util.ReaderIterator.next(ReaderIterator.java:59)
=
at org.apache.fl= ink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(= UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.U= nilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)=
Caused by: java.lang.ArrayIndexOutOfBoundsException:= 32768
at o= rg.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegme= nt.java:104)
at org.a= pache.flink.runtime.io.network.api.serialization.SpillingAdap= tiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(Spilli= ngAdaptiveSpanningRecordDeserializer.java:226)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserial= izer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanning= RecordDeserializer.java:231)
at org.apache.flink.types.StringValue.readStrin= g(StringValue.java:770)
at org.apache.flink.api.common.typeutils.base.= StringSerializer.deserialize(StringSerializer.java:69)
= at org.apache.flink.a= pi.common.typeutils.base.StringSerializer.deserialize(StringSeria= lizer.java:74)
at org.apache.flink.api.common.typeutils.base.StringSerialize= r.deserialize(StringSerializer.java:28)
at org.apache.flink.api.java.typeutils= .runtime.RowSerializer.deserialize(RowSerializer.java:193)
at org.apache= .flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSeria= lizer.java:36)
at org.apache.flink.runtime.plugable.ReusingDeserializationDe= legate.read(ReusingDeserializationDelegate.java:57)
<= div> at org.apache.flink.runtime= .io.network.api.serialization.SpillingAdaptiveSpanningRecordD= eserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseriali= zer.java:109)
= ... 5 more

On Fri,= Apr 21, 2017 at 11:43 AM, Stefano Bortoli <stefano.bortoli@huawei.com>= ; wrote:

In fact the old problem was with the KryoSer= ializer missed initialization on the exception that would trigger the spill= ing on disk. This would lead to dirty serialization buffer that would eventually break the program. Till worked on it debuggin= g the source code generating the error. Perhaps someone could try the same = also this time. If Flavio can make the problem reproducible in a shareable = program+data.

=C2=A0

Stefano

=C2=A0

From: Stephan Ewen [mailto:sewen@apache.org]
Sent: Friday, April 21, 2017 10:04 AM
To: user <user@flink.apache.org>
Subject: Re: UnilateralSortMerger error (again)
=

=C2=A0

In the past, these errors were most often caused by = bugs in the serializers, not in the sorter.

=C2=A0

What types are you using at that point? The Stack Tr= ace reveals ROW and StringValue, any other involved types?

=C2=A0

On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier = <pompermaier@o= kkam.it> wrote:

As suggested by Fabian I set=C2=A0taskmanager.memory= .size =3D 1024 (to force spilling to disk) and the job failed almost immedi= ately..

=C2=A0

On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier= <pompermaier@= okkam.it> wrote:

I debugged a bit the process repeating the job on a = sub-slice of the entire data (using the id value to filter data with parque= t push down filters) and all slices completed successfully :(=

So I tried to increase the parallelism (from 1 slot = per TM to 4) to see if this was somehow a factor of stress but it didn'= t cause any error.

Then I almost doubled the number of rows to process = and finally the error showed up again.

It seems somehow related to spilling to disk but I c= an't really understand what's going on :(

This is a summary of my debug attempts:

=C2=A0

4 Task managers with 6 GB =C2=A0and 1 slot each, par= allelism =3D 4

=C2=A0

id < 10.000.000.000=C2=A0 =3D> 1.857.365 rows =3D> OK

id >=3D 10.000.000.000 && id < 10.010.= 000.000 =3D> 20.057.714 rows =3D> OK

id >=3D 10.010.000.000 && id < 99.945.= 000.000 =C2=A0 =C2=A0 =C2=A0 =3D> 20.926.903 rows =3D> OK

id >=3D 99.945.000.000 && id < 99.960.= 000.000 =C2=A0 =C2=A0 =C2=A0 =3D> 23.888.750 =C2=A0rows =3D> OK

id >=3D 99.960.000.000 =3D> 32.936.422 rows =3D> OK

=C2=A0

4 TM with 8 GB and 4 slot each, parallelism 16


id >=3D 99.960.000.000 =3D> 32.936.422 rows =3D> OK

id >=3D 99.945.000.000=C2=A0 =3D>=C2=A056.825.172 rows =3D> ERROR

=C2=A0

Any help is appreciated..

Best,

Flavio

=C2=A0

On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier = <pompermaier@o= kkam.it> wrote:

I could but only if there's a good probability t= hat it fix the problem...how confident are you about it?

=C2=A0

On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhihong@gmail.com&g= t; wrote:

Looking at git log of=C2=A0DataInputDeserializer.jav= a , there has been some recent change.

=C2=A0

If you have time, maybe try with 1.2.1 RC and see if= the error is reproducible ?

=C2=A0

Cheers

=C2=A0

On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier= <pompermaier@= okkam.it> wrote:

Hi to all,

I think I'm again on the weird Exception with th= e SpillingAdaptiveSpanningRecordDeserializer...

I'm using Flink 1.2.0 and the error seems to ris= e when Flink spills to disk but the Exception thrown is not very helpful. A= ny idea?

=C2=A0

Caused by: java.lang.RuntimeException: Error obtaini= ng the sorted input: Thread 'SortMerger Reading Thread' terminated = due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.g= etIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTas= k.java:1094)
at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(G= roupReduceDriver.java:99)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.jav= a:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' = terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$T= hreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.EOFException
at org.apache.flink.runtime.util.DataInputDeserializer.readUnsign= edByte(DataInputDeserializer.java:306)
at org.apache.flink.types.StringValue.readString(StringValue.java= :747)
at org.apache.flink.api.common.typeutils.base.StringSerializer.de= serialize(StringSerializer.java:69)
at org.apache.flink.api.common.typeutils.base.StringSerializer.de= serialize(StringSerializer.java:74)
at org.apache.flink.api.common.typeutils.base.StringSerializer.de= serialize(StringSerializer.java:28)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deseria= lize(RowSerializer.java:193)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deseria= lize(RowSerializer.java:36)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelega= te.read(ReusingDeserializationDelegate.java:57)
at org.apa= che.flink.runtime.io.network.api.serialization.SpillingAdapti= veSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanni= ngRecordDeserializer.java:144)
at org.apa= che.flink.runtime.io.network.api.reader.AbstractRecordReader.= getNextRecord(AbstractRecordReader.java:72)
at org.apa= che.flink.runtime.io.network.api.reader.MutableRecordReader.n= ext(MutableRecordReader.java:42)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(Re= aderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$R= eadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$T= hreadBase.run(UnilateralSortMerger.java:796)


Best,

Flavio

=C2=A0

=C2=A0

=C2=A0

=C2=A0

=C2=A0








--
Best,
Kurt
<= /div>


--001a11440f2282feae054e27a5c3--