Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1A439200C62 for ; Wed, 26 Apr 2017 21:55:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 18A81160BA8; Wed, 26 Apr 2017 19:55:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4BBF1160AA1 for ; Wed, 26 Apr 2017 21:55:55 +0200 (CEST) Received: (qmail 42662 invoked by uid 500); 26 Apr 2017 19:55:49 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 42652 invoked by uid 99); 26 Apr 2017 19:55:49 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Apr 2017 19:55:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id C7AA2C4556 for ; Wed, 26 Apr 2017 19:55:48 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.866 X-Spam-Level: X-Spam-Status: No, score=0.866 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, HTML_OBFUSCATE_10_20=1.162, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-2.796, RCVD_IN_SORBS_SPAM=0.5] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=okkam-it.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id J2zm4FCAXNMj for ; Wed, 26 Apr 2017 19:55:43 +0000 (UTC) Received: from mail-vk0-f44.google.com (mail-vk0-f44.google.com [209.85.213.44]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 720645F306 for ; Wed, 26 Apr 2017 19:55:42 +0000 (UTC) Received: by mail-vk0-f44.google.com with SMTP id j127so6562437vkh.0 for ; Wed, 26 Apr 2017 12:55:42 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=okkam-it.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=2jUqBPhMqfb0IzSn6HrXDQ+ap14hK6BN5I5z1SD/dS0=; b=D7IqWNINPRpTBNFbHHJgBgEO6esrnGrEtWkPHRcsy9m0n42FfZ9agb+JlG/ELXsU74 Fz7Cs6FqaXFxMgYW5j6HaV1YCAQwsTdyf6El9EOH0nR7kl2lbPyigPMhXleloIb+3GU7 zHVSr1h1pSUYPEKRgQ4QjSxaGb+/eKGieL4LV1P/TccvmB8lgO0Vi5JW6manMjN6w7BW 8R6xTSK08vhzJoHjJ88z6NmNIbwd3K6SihoFa+qqwS5n7lk3KiIu0Vqmr1AbHwbCmrtU EHxV28XdDyftHjXh8klEbUBPKWtxCXtI5esxfzdf2G8A4HIPR0EqbXrAZTn4DzOxvC9W JO3w== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=2jUqBPhMqfb0IzSn6HrXDQ+ap14hK6BN5I5z1SD/dS0=; b=rlIAR0ErAcUcMsufbNNRCCnS2dLG4/kiKk8PdghTBJ1/LQIT7hDwy1TMm3DMIYV7qf GZLqsnci024pZYIu4jI9kSZ4ZMCkMvc62GKHNdhZbc+9udbafjF+HDoTkRWlLNHTvVqo 1EtnQPZ0EiXnmRwq+B3YfYmeb2i6AeG4aD1jCSeFnNC3iRIvvtv/uFbJ4D5F63EN+9ia 8U19SXw1J309kdJ+RMEF2azBpCOqnD2tEpXJBiohMpnWS/gBIhU5HNEmfmJEO3dGaswo OuAirMybsQ/JgSz+NPP1mN3ePyrS//jkLXzS7yaQUWzA035Wq/q41o+RHTxN2RZ673ab XRrg== X-Gm-Message-State: AN3rC/40woyUFxBXCSgiJd7sI+2HhAgEKoAPLnplsopV9Y64Vcc7Pket rP26HyZivJIoQSsjSjH4FCw2S7zQtvfI X-Received: by 10.31.88.71 with SMTP id m68mr858603vkb.140.1493236541049; Wed, 26 Apr 2017 12:55:41 -0700 (PDT) MIME-Version: 1.0 Received: by 10.31.49.83 with HTTP; Wed, 26 Apr 2017 12:55:20 -0700 (PDT) X-Originating-IP: [87.14.82.226] In-Reply-To: References: <633F19C6003CFF44891E456BB8269E55014ECAA3@lhreml505-mbs.china.huawei.com> From: Flavio Pompermaier Date: Wed, 26 Apr 2017 21:55:20 +0200 Message-ID: Subject: Re: UnilateralSortMerger error (again) To: user Content-Type: multipart/alternative; boundary=001a114e53be986338054e173996 archived-at: Wed, 26 Apr 2017 19:55:57 -0000 --001a114e53be986338054e173996 Content-Type: text/plain; charset=UTF-8 I've created a repository with a unit test to reproduce the error at https://github.com/fpompermaier/flink-batch-bug/ blob/master/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java (probably this error is related also to FLINK-4719). The exception is thrown only when there are null strings and multiple slots per TM, I don't know whether UnilateralSorterMerger is involved or not (but I think so..). A quick fix for this problem would be very appreciated because it's bloking a production deployment.. Thanks in advance to all, Flavio On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier wrote: > After digging into the code and test I think that the problem is almost > certainly in the UnilateralSortMerger, there should be a missing > synchronization on some shared object somewhere...Right now I'm trying to > understand if this section of code creates some shared object (like queues) > that are accessed in a bad way when there's spilling to disk: > > // start the thread that reads the input channels > this.readThread = getReadingThread(exceptionHandler, input, > circularQueues, largeRecordHandler, > parentTask, serializer, ((long) (startSpillingFraction * sortMemory))); > > // start the thread that sorts the buffers > this.sortThread = getSortingThread(exceptionHandler, circularQueues, > parentTask); > > // start the thread that handles spilling to secondary storage > this.spillThread = getSpillingThread(exceptionHandler, circularQueues, > parentTask, > memoryManager, ioManager, serializerFactory, comparator, > this.sortReadMemory, this.writeMemory, > maxNumFileHandles); > .... > startThreads(); > > > The problem is that the unit tests of GroupReduceDriver use Record and > testing Rows in not very straightforward and I'm still trying to reproduce > the problem in a local env.. > > On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier > wrote: > >> Thanks for the explanation . Is there a way to force this behaviour in a >> local environment (to try to debug the problem)? >> >> On 21 Apr 2017 21:49, "Fabian Hueske" wrote: >> >>> Hi Flavio, >>> >>> these files are used for spilling data to disk. In your case sorted runs >>> of records. >>> Later all (up to a fanout threshold) these sorted runs are read and >>> merged to get a completely sorted record stream. >>> >>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier : >>> >>>> The error appears as soon as some taskmanager generates some >>>> inputchannel file. >>>> What are those files used for? >>>> >>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier < >>>> pompermaier@okkam.it> wrote: >>>> >>>>> In another run of the job I had another Exception. Could it be helpful? >>>>> >>>>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread' >>>>> terminated due to an exception: Serializer consumed more bytes than the >>>>> record had. This indicates broken serialization. If you are using custom >>>>> serialization types (Value or Writable), check their serialization methods. >>>>> If you are using a Kryo-serialized type, check the corresponding Kryo >>>>> serializer. >>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>> ava:465) >>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >>>>> k.java:355) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: >>>>> Serializer consumed more bytes than the record had. This indicates broken >>>>> serialization. If you are using custom serialization types (Value or >>>>> Writable), check their serialization methods. If you are using a >>>>> Kryo-serialized type, check the corresponding Kryo serializer. >>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>> .getIterator(UnilateralSortMerger.java:619) >>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>>> ask.java:1094) >>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >>>>> (GroupReduceDriver.java:99) >>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>> ava:460) >>>>> ... 3 more >>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>>> terminated due to an exception: Serializer consumed more bytes than the >>>>> record had. This indicates broken serialization. If you are using custom >>>>> serialization types (Value or Writable), check their serialization methods. >>>>> If you are using a Kryo-serialized type, check the corresponding Kryo >>>>> serializer. >>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>> $ThreadBase.run(UnilateralSortMerger.java:799) >>>>> Caused by: java.io.IOException: Serializer consumed more bytes than >>>>> the record had. This indicates broken serialization. If you are using >>>>> custom serialization types (Value or Writable), check their serialization >>>>> methods. If you are using a Kryo-serialized type, check the corresponding >>>>> Kryo serializer. >>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>> daptiveSpanningRecordDeserializer.java:123) >>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>>> dReader.getNextRecord(AbstractRecordReader.java:72) >>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>>> Reader.next(MutableRecordReader.java:42) >>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>>> ReaderIterator.java:59) >>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>> $ReadingThread.go(UnilateralSortMerger.java:1035) >>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>> $ThreadBase.run(UnilateralSortMerger.java:796) >>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 >>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor >>>>> ySegment.java:104) >>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read >>>>> Byte(SpillingAdaptiveSpanningRecordDeserializer.java:226) >>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read >>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231) >>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:770) >>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>> deserialize(StringSerializer.java:69) >>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>> deserialize(StringSerializer.java:74) >>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>> deserialize(StringSerializer.java:28) >>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>> serialize(RowSerializer.java:193) >>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>> serialize(RowSerializer.java:36) >>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >>>>> gate.read(ReusingDeserializationDelegate.java:57) >>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>> daptiveSpanningRecordDeserializer.java:109) >>>>> ... 5 more >>>>> >>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli < >>>>> stefano.bortoli@huawei.com> wrote: >>>>> >>>>>> In fact the old problem was with the KryoSerializer missed >>>>>> initialization on the exception that would trigger the spilling on disk. >>>>>> This would lead to dirty serialization buffer that would eventually break >>>>>> the program. Till worked on it debugging the source code generating the >>>>>> error. Perhaps someone could try the same also this time. If Flavio can >>>>>> make the problem reproducible in a shareable program+data. >>>>>> >>>>>> >>>>>> >>>>>> Stefano >>>>>> >>>>>> >>>>>> >>>>>> *From:* Stephan Ewen [mailto:sewen@apache.org] >>>>>> *Sent:* Friday, April 21, 2017 10:04 AM >>>>>> *To:* user >>>>>> *Subject:* Re: UnilateralSortMerger error (again) >>>>>> >>>>>> >>>>>> >>>>>> In the past, these errors were most often caused by bugs in the >>>>>> serializers, not in the sorter. >>>>>> >>>>>> >>>>>> >>>>>> What types are you using at that point? The Stack Trace reveals ROW >>>>>> and StringValue, any other involved types? >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier < >>>>>> pompermaier@okkam.it> wrote: >>>>>> >>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force >>>>>> spilling to disk) and the job failed almost immediately.. >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier < >>>>>> pompermaier@okkam.it> wrote: >>>>>> >>>>>> I debugged a bit the process repeating the job on a sub-slice of the >>>>>> entire data (using the id value to filter data with parquet push down >>>>>> filters) and all slices completed successfully :( >>>>>> >>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to >>>>>> see if this was somehow a factor of stress but it didn't cause any error. >>>>>> >>>>>> Then I almost doubled the number of rows to process and finally the >>>>>> error showed up again. >>>>>> >>>>>> It seems somehow related to spilling to disk but I can't really >>>>>> understand what's going on :( >>>>>> >>>>>> This is a summary of my debug attempts: >>>>>> >>>>>> >>>>>> >>>>>> 4 Task managers with 6 GB and 1 slot each, parallelism = 4 >>>>>> >>>>>> >>>>>> >>>>>> id < 10.000.000.000 => 1.857.365 rows => OK >>>>>> >>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK >>>>>> >>>>>> id >= 10.010.000.000 && id < 99.945.000.000 => 20.926.903 rows >>>>>> => OK >>>>>> >>>>>> id >= 99.945.000.000 && id < 99.960.000.000 => 23.888.750 rows >>>>>> => OK >>>>>> >>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK >>>>>> >>>>>> >>>>>> >>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16 >>>>>> >>>>>> >>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK >>>>>> >>>>>> id >= 99.945.000.000 => 56.825.172 rows => ERROR >>>>>> >>>>>> >>>>>> >>>>>> Any help is appreciated.. >>>>>> >>>>>> Best, >>>>>> >>>>>> Flavio >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier < >>>>>> pompermaier@okkam.it> wrote: >>>>>> >>>>>> I could but only if there's a good probability that it fix the >>>>>> problem...how confident are you about it? >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu wrote: >>>>>> >>>>>> Looking at git log of DataInputDeserializer.java , there has been >>>>>> some recent change. >>>>>> >>>>>> >>>>>> >>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is >>>>>> reproducible ? >>>>>> >>>>>> >>>>>> >>>>>> Cheers >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier < >>>>>> pompermaier@okkam.it> wrote: >>>>>> >>>>>> Hi to all, >>>>>> >>>>>> I think I'm again on the weird Exception with the >>>>>> SpillingAdaptiveSpanningRecordDeserializer... >>>>>> >>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills >>>>>> to disk but the Exception thrown is not very helpful. Any idea? >>>>>> >>>>>> >>>>>> >>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: >>>>>> null >>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>> .getIterator(UnilateralSortMerger.java:619) >>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>>>> ask.java:1094) >>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >>>>>> (GroupReduceDriver.java:99) >>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>>> ava:460) >>>>>> ... 3 more >>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>>>> terminated due to an exception: null >>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>> $ThreadBase.run(UnilateralSortMerger.java:799) >>>>>> Caused by: java.io.EOFException >>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi >>>>>> gnedByte(DataInputDeserializer.java:306) >>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja >>>>>> va:747) >>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>> deserialize(StringSerializer.java:69) >>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>> deserialize(StringSerializer.java:74) >>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>> deserialize(StringSerializer.java:28) >>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>> serialize(RowSerializer.java:193) >>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>> serialize(RowSerializer.java:36) >>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >>>>>> gate.read(ReusingDeserializationDelegate.java:57) >>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>>> daptiveSpanningRecordDeserializer.java:144) >>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>>>> dReader.getNextRecord(AbstractRecordReader.java:72) >>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>>>> Reader.next(MutableRecordReader.java:42) >>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>>>> ReaderIterator.java:59) >>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>> $ThreadBase.run(UnilateralSortMerger.java:796) >>>>>> >>>>>> >>>>>> Best, >>>>>> >>>>>> Flavio >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> >>>> >>> > > --001a114e53be986338054e173996 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
I've created a repository with a unit test to reproduc= e the error at=C2=A0https://github.com/fpompermaier/flink-batch-bu= g/blob/master/src/test/java/it/okkam/flink/aci/TestDataInput= Deserializer.java=C2=A0(probably this error is related also to FLINK-47= 19).

The exception is =C2=A0thrown only when there are n= ull strings and multiple slots per TM, I don't know whether UnilateralS= orterMerger is involved or not (but I think so..).=C2=A0
A quick = fix for this problem would be very appreciated because it's bloking a p= roduction deployment..

Thanks in advance to all,
Flavio

On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier &= lt;pompermaier@ok= kam.it> wrote:
After digging into the code and test I think that the problem is almos= t certainly in the=C2=A0UnilateralSortMerger, there should be a missing syn= chronization on some shared object somewhere...Right now I'm trying to = understand if this section of code creates some shared object (like queues)= that are accessed in a bad way when there's spilling to disk:

=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0// s= tart the thread that reads the input channels
this.readThread =3D getReadingThread(exceptionHandler, input, circ= ularQueues, largeRecordHandler,
parentTa= sk, serializer, ((long) (startSpillingFraction * sortMemory)));
<= br>
// start the thread that sorts the buf= fers
this.sortThread =3D getSortingThread(= exceptionHandler, circularQueues, parentTask);

// start the thread that handles spilling to secon= dary storage
this.spillThread =3D getSpill= ingThread(exceptionHandler, circularQueues, parentTask,=C2=A0
memoryManager, ioManager, serializerFactory, com= parator, this.sortReadMemory, this.writeMemory,=C2=A0
maxNumFileHandles);
....
startThreads();


=
The problem is that the unit tests of GroupReduceDriver use= Record and testing Rows in not very straightforward and I'm still tryi= ng to reproduce the problem in a local env..

On Fri, Apr 21, 201= 7 at 9:53 PM, Flavio Pompermaier <pompermaier@okkam.it> w= rote:
Thanks for the explanation . Is there a way to force this behaviour in a = local environment (to try to debug the problem)?

On 21 Apr 2017 21:49, "Fabian Hueske" &l= t;fhueske@gmail.com<= /a>> wrote:
Hi Flavio,

these= files are used for spilling data to disk. In your case sorted runs of reco= rds.
Later all (up to a fanout threshold) these sorted runs are r= ead and merged to get a completely sorted record stream.

2017-04-21 14:09 GMT= +02:00 Flavio Pompermaier <pompermaier@okkam.it>:
The error ap= pears as soon as some taskmanager generates some inputchannel file.
Wha= t are those files used for?

On Fri, Apr 21, 2017= at 11:53 AM, Flavio Pompermaier <pompermaier@okkam.it> w= rote:
In another run of the job I had another Exception. Could it be helpful?

Error obtaining the sorted input: Thread 'So= rtMerger Reading Thread' terminated due to an exception: Serializer con= sumed more bytes than the record had. This indicates broken serialization. = If you are using custom serialization types (Value or Writable), check thei= r serialization methods. If you are using a Kryo-serialized type, check the= corresponding Kryo serializer.
at org.apache.flink.runtime.operators.BatchT= ask.run(BatchTask.java:465)
at org.apache.flink.runtime.operators.Batch= Task.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager= .Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
= Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Th= read 'SortMerger Reading Thread' terminated due to an exception: Se= rializer consumed more bytes than the record had. This indicates broken ser= ialization. If you are using custom serialization types (Value or Writable)= , check their serialization methods. If you are using a Kryo-serialized typ= e, check the corresponding Kryo serializer.
at org.apache.flink.runtime.ope= rators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(Bat= chTask.java:1094)
at org.apache.flink.runtime.operators.GroupReduceDriv= er.prepare(GroupReduceDriver.java:99)
at org.apache.flink.runtime.opera= tors.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by= : java.io.IOException: Thread 'SortMerger Reading Thread' terminate= d due to an exception: Serializer consumed more bytes than the record had. = This indicates broken serialization. If you are using custom serialization = types (Value or Writable), check their serialization methods. If you are us= ing a Kryo-serialized type, check the corresponding Kryo serializer.
<= div> at org.apach= e.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.ru= n(UnilateralSortMerger.java:799)
Caused by: java.io.IOExcept= ion: Serializer consumed more bytes than the record had. This indicates bro= ken serialization. If you are using custom serialization types (Value or Wr= itable), check their serialization methods. If you are using a Kryo-seriali= zed type, check the corresponding Kryo serializer.
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeseriali= zer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java= :123)
<= /span>at o= rg.apache.flink.runtime.io.network.api.reader.AbstractRecordR= eader.getNextRecord(AbstractRecordReader.java:72)
at org.apache.flink.runtime.io.n= etwork.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
<= /span>at org.apache.flink.runtime.operators.util.ReaderIterator.next(<= wbr>ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.Unila= teralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)<= /div>
at org= .apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadB= ase.run(UnilateralSortMerger.java:796)
Caused by: jav= a.lang.ArrayIndexOutOfBoundsException: 32768
at org.apache.flink.core.memory= .HeapMemorySegment.get(HeapMemorySegment.java:104)
at org.apache.flink.runtime.io= .network.api.serialization.SpillingAdaptiveSpanningRecordDese= rializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
at org.apache.flink.runtime.io.network.api.serializa= tion.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrappe= r.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.jav= a:231)
<= /span>at org.apache.flink.types.StringValue.readString(StringValue.ja<= wbr>va:770)
at org.apache.flink.api.common.typeutils.base.StringSerial= izer.deserialize(StringSerializer.java:69)
at org.apache.flink.api.com= mon.typeutils.base.StringSerializer.deserialize(StringSerializer.= java:74)
= at org.apache.flink.api.common.typeutils.base.StringSerializer= .deserialize(StringSerializer.java:28)
at org.apache.flink.api.java.typ= eu= tils.runtime.RowSerializer.deserialize(RowSerializer.java:193= )
at o= rg.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize= (RowSerializer.java:36)
at org.apache.flink.runtime.plugable.ReusingDes= erializationDelegate.read(ReusingDeserializationDelegate.java:57)=
at org.a= pache.flink.runtime.io.network.api.serialization.SpillingAdap= tiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpan= ningRecordDeserializer.java:109)
... 5 more

On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <= span dir=3D"ltr"><stefano.bortoli@huawei.com> wrote:

In fact the old problem was with the KryoSer= ializer missed initialization on the exception that would trigger the spill= ing on disk. This would lead to dirty serialization buffer that would eventually break the program. Till worked on it debuggin= g the source code generating the error. Perhaps someone could try the same = also this time. If Flavio can make the problem reproducible in a shareable = program+data.

=C2=A0

Stefano

=C2=A0

From: Stephan Ewen [mailto:sewen@apache.org]
Sent: Friday, April 21, 2017 10:04 AM
To: user <user@flink.apache.org>
Subject: Re: UnilateralSortMerger error (again)
=

=C2=A0

In the past, these errors were most often caused by = bugs in the serializers, not in the sorter.

=C2=A0

What types are you using at that point? The Stack Tr= ace reveals ROW and StringValue, any other involved types?

=C2=A0

On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier = <pompermaier@o= kkam.it> wrote:

As suggested by Fabian I set=C2=A0taskmanager.memory= .size =3D 1024 (to force spilling to disk) and the job failed almost immedi= ately..

=C2=A0

On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier= <pompermaier@= okkam.it> wrote:

I debugged a bit the process repeating the job on a = sub-slice of the entire data (using the id value to filter data with parque= t push down filters) and all slices completed successfully :(=

So I tried to increase the parallelism (from 1 slot = per TM to 4) to see if this was somehow a factor of stress but it didn'= t cause any error.

Then I almost doubled the number of rows to process = and finally the error showed up again.

It seems somehow related to spilling to disk but I c= an't really understand what's going on :(

This is a summary of my debug attempts:

=C2=A0

4 Task managers with 6 GB =C2=A0and 1 slot each, par= allelism =3D 4

=C2=A0

id < 10.000.000.000=C2=A0 =3D> 1.857.365 rows =3D> OK

id >=3D 10.000.000.000 && id < 10.010.= 000.000 =3D> 20.057.714 rows =3D> OK

id >=3D 10.010.000.000 && id < 99.945.= 000.000 =C2=A0 =C2=A0 =C2=A0 =3D> 20.926.903 rows =3D> OK

id >=3D 99.945.000.000 && id < 99.960.= 000.000 =C2=A0 =C2=A0 =C2=A0 =3D> 23.888.750 =C2=A0rows =3D> OK

id >=3D 99.960.000.000 =3D> 32.936.422 rows =3D> OK

=C2=A0

4 TM with 8 GB and 4 slot each, parallelism 16


id >=3D 99.960.000.000 =3D> 32.936.422 rows =3D> OK

id >=3D 99.945.000.000=C2=A0 =3D>=C2=A056.825.172 rows =3D> ERROR

=C2=A0

Any help is appreciated..

Best,

Flavio

=C2=A0

On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier = <pompermaier@o= kkam.it> wrote:

I could but only if there's a good probability t= hat it fix the problem...how confident are you about it?

=C2=A0

On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhihong@gmail.com&g= t; wrote:

Looking at git log of=C2=A0DataInputDeserializer.jav= a , there has been some recent change.

=C2=A0

If you have time, maybe try with 1.2.1 RC and see if= the error is reproducible ?

=C2=A0

Cheers

=C2=A0

On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier= <pompermaier@= okkam.it> wrote:

Hi to all,

I think I'm again on the weird Exception with th= e SpillingAdaptiveSpanningRecordDeserializer...

I'm using Flink 1.2.0 and the error seems to ris= e when Flink spills to disk but the Exception thrown is not very helpful. A= ny idea?

=C2=A0

Caused by: java.lang.RuntimeException: Error obtaini= ng the sorted input: Thread 'SortMerger Reading Thread' terminated = due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.g= etIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTas= k.java:1094)
at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(G= roupReduceDriver.java:99)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.jav= a:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' = terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$T= hreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.EOFException
at org.apache.flink.runtime.util.DataInputDeserializer.readUnsign= edByte(DataInputDeserializer.java:306)
at org.apache.flink.types.StringValue.readString(StringValue.java= :747)
at org.apache.flink.api.common.typeutils.base.StringSerializer.de= serialize(StringSerializer.java:69)
at org.apache.flink.api.common.typeutils.base.StringSerializer.de= serialize(StringSerializer.java:74)
at org.apache.flink.api.common.typeutils.base.StringSerializer.de= serialize(StringSerializer.java:28)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deseria= lize(RowSerializer.java:193)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deseria= lize(RowSerializer.java:36)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelega= te.read(ReusingDeserializationDelegate.java:57)
at org.apa= che.flink.runtime.io.network.api.serialization.SpillingAdapti= veSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanni= ngRecordDeserializer.java:144)
at org.apa= che.flink.runtime.io.network.api.reader.AbstractRecordReader.= getNextRecord(AbstractRecordReader.java:72)
at org.apa= che.flink.runtime.io.network.api.reader.MutableRecordReader.n= ext(MutableRecordReader.java:42)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(Re= aderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$R= eadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$T= hreadBase.run(UnilateralSortMerger.java:796)


Best,

Flavio

=C2=A0

=C2=A0

=C2=A0

=C2=A0

=C2=A0








--001a114e53be986338054e173996--