Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EEE6410F78 for ; Thu, 4 Dec 2014 11:49:40 +0000 (UTC) Received: (qmail 39153 invoked by uid 500); 4 Dec 2014 11:49:40 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 39088 invoked by uid 500); 4 Dec 2014 11:49:40 -0000 Mailing-List: contact user-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.incubator.apache.org Delivered-To: mailing list user@flink.incubator.apache.org Received: (qmail 39078 invoked by uid 99); 4 Dec 2014 11:49:40 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Dec 2014 11:49:40 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of s.bortoli@gmail.com designates 74.125.82.53 as permitted sender) Received: from [74.125.82.53] (HELO mail-wg0-f53.google.com) (74.125.82.53) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Dec 2014 11:49:14 +0000 Received: by mail-wg0-f53.google.com with SMTP id l18so22495180wgh.12 for ; Thu, 04 Dec 2014 03:49:13 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=1VYbjUwBkrRBL2rI895tp3oMOUUzJ6CXlK4GjzseSgQ=; b=HdJsQzPKew5YN5shoZNxDsTohEMd9F6hS3Q2BfICcoPKQWzr+2bXw4JduM4VO+qnPE FBYQj4FNsPiCF10ST1k157hjJcvNplXMCx3qDTMjkqWyI0Rq2GIGo/nlJ7tPTp09y9ab ZTWJ/c/m4u/x1RlzsnVa3pQLalqaDfpbbNqGHm2TgW1KHChEvBgKsm6NZQwTgaZkuf2l tPPYpxNMoYN4hZWkSLCpd2XOypnN8lYtk5HrBbFH/VKdr1riIAoflCjsQdez5nR2pWK8 vYykH48lDRivuZKN9Pk9dyuLUzgd6tZxGmFDZ9JFUk0v3fS0ZuOqPINCdlDlB/a/h9vw NyZw== X-Received: by 10.180.107.193 with SMTP id he1mr106282416wib.27.1417693753646; Thu, 04 Dec 2014 03:49:13 -0800 (PST) MIME-Version: 1.0 Received: by 10.27.138.197 with HTTP; Thu, 4 Dec 2014 03:48:53 -0800 (PST) In-Reply-To: References: From: Stefano Bortoli Date: Thu, 4 Dec 2014 12:48:53 +0100 Message-ID: Subject: Re: No Space Left on Device To: user Content-Type: multipart/alternative; boundary=e89a8f234c039625ab0509628c2c X-Virus-Checked: Checked by ClamAV on apache.org --e89a8f234c039625ab0509628c2c Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable JAXB and Serialization are necessary for my business logic. I store data as byte[] which are plain serialization of XML String. At every read I have to rebuild the objects using jaxb. Kryo in Flink will allow to manage more easily user defined objects, I guess. saluti, Stefano 2014-12-04 12:41 GMT+01:00 Stephan Ewen : > Hi Stefano! > > Good to hear that it is working for you! > > Just a heads up: Flink is not using JAXB or any other Java Serialization > for its data exchange, only to deploy functions into the cluster (which i= s > usually very fast). When we send records around, we have a special > serialization stack that is absolutely competitive with Kryo on > serialization speed. We are thinking of using Kryo, though, to deploy > functions into the cluster in the future, to work around some of the > constraints that the java serialization has. > > Greetings, > Stephan > > > On Thu, Dec 4, 2014 at 8:48 AM, Stefano Bortoli > wrote: > >> The process was completed in about 6h45m, much less than the previous >> one. The longest time is still taken by the 'blocking part'. I guess we >> could just increase redundancy of SolrCloud indexes, and we could reach >> amazing performances. Furthermore, we did not apply any 'key >> transformation' (reversing or generating Long as ID), so we have further >> margin for improvements. Furthermore, I have the feeling that relying on >> Kryo serialization to build the POJOs rather than old-school JAXB >> marshalling/unmarshalling would also give quite a boost as we repeat the >> operation at least 250M times. :-) >> >> Thanks a lot to everyone. Flink is making possible effective >> deduplication on a very heterogeneous dataset of about 10M entries withi= n >> hours in a cluster of 6 cheap hardware nodes. :-) >> >> saluti, >> Stefano >> >> 2014-12-03 18:31 GMT+01:00 Stefano Bortoli : >> >>> Hi all, >>> >>> thanks for the feedback. For the moment, I hope I resolved the problem >>> by compressing the string into a bite[] using a custom implementation o= f >>> Value interface and LZ4 algorithm. I have a little overhead on the >>> processing of some steps, but it should reduce network traffic and requ= ired >>> temporary space on disk. >>> >>> I think the problem is due to the two joins moving around quite a bit o= f >>> data. Essentially I join twice something like 230 million tuples with a >>> dataset of 9.2 million entries (~80GB). Compression seems to be working >>> fine so far, even though I did not reach the critical point yet. I'll k= eep >>> you posted to let you know whether this workaround solved the problem. >>> >>> I applied a double join as an alternative to the repeat 230M*2 single >>> gets on HBase. Even though this allowed to completed the process in abo= ut >>> 11h. >>> >>> thanks a lot to everyone again. >>> >>> saluti, >>> Stefano >>> >>> >>> >>> >>> >>> >>> 2014-12-03 18:02 GMT+01:00 Flavio Pompermaier : >>> >>>> I think I can answer on behalf of Stefano that is busy right now..the >>>> job failed because on the job manager (that is also a task manager) th= e >>>> temp folder was full. >>>> We would like to understand how big should be the temp directory..whic= h >>>> parameters should we consider to make that computation? >>>> >>>> >>>> On Wed, Dec 3, 2014 at 5:22 PM, Ufuk Celebi wrote: >>>> >>>>> The task managers log the temporary directories at start up. Can you >>>>> have a look there and verify that you configured the temporary direct= ories >>>>> correctly? >>>>> >>>>> On Wed, Dec 3, 2014 at 5:17 PM, Stephan Ewen wrote= : >>>>> >>>>>> Hi! >>>>>> >>>>>> That exception means that one of the directories is full. If you hav= e >>>>>> several temp directories on different disks, you can add them all to= the >>>>>> config and the temp files will be rotated across the disks. >>>>>> >>>>>> The exception may come once the first temp directory is full. For >>>>>> example, if you have 4 temp dirs (where 1 is rather full while the o= thers >>>>>> have a lot of space), it may be that one temp file on the full direc= tory >>>>>> grows large and exceeds the space, while the other directories have = plenty >>>>>> of space. >>>>>> >>>>>> Greetings, >>>>>> Stephan >>>>>> >>>>>> >>>>>> On Wed, Dec 3, 2014 at 4:40 PM, Robert Metzger >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I think Flink is deleting its temporary files. >>>>>>> >>>>>>> Is the temp. path set to the SSD on each machine? >>>>>>> What is the size of the two data sets your are joining? Your cluste= r >>>>>>> has 6*256GB =3D 1.5 TB of temporary disk space. >>>>>>> Maybe only the temp directory of one node is full? >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Dec 3, 2014 at 3:52 PM, Ufuk Celebi wrote: >>>>>>> >>>>>>>> Hey Stefano, >>>>>>>> >>>>>>>> I would wait for Stephan's take on this, but with caught >>>>>>>> IOExceptions the hash table should properly clean up after itself = and >>>>>>>> delete the file. >>>>>>>> >>>>>>>> Can you still reproduce this problem for your use case? >>>>>>>> >>>>>>>> =E2=80=93 Ufuk >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Dec 2, 2014 at 7:07 PM, Stefano Bortoli < >>>>>>>> s.bortoli@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi guys, >>>>>>>>> >>>>>>>>> a quite long process failed due to this No Space Left on Device >>>>>>>>> exception, but the machine disk is not full at all. >>>>>>>>> >>>>>>>>> okkam@okkam-nano-2:/opt/flink-0.8$ df >>>>>>>>> Filesystem 1K-blocks Used Available Use% Mounted on >>>>>>>>> /dev/sdb2 223302236 22819504 189116588 11% / >>>>>>>>> none 4 0 4 0% /sys/fs/cgroup >>>>>>>>> udev 8156864 4 8156860 1% /dev >>>>>>>>> tmpfs 1633520 524 1632996 1% /run >>>>>>>>> none 5120 0 5120 0% /run/lock >>>>>>>>> none 8167584 0 8167584 0% /run/shm >>>>>>>>> none 102400 0 102400 0% /run/user >>>>>>>>> /dev/sdb1 523248 3428 519820 1% /boot/efi >>>>>>>>> /dev/sda1 961302560 2218352 910229748 1% /media/data >>>>>>>>> cm_processes 8167584 12116 8155468 1% >>>>>>>>> /run/cloudera-scm-agent/process >>>>>>>>> >>>>>>>>> Is it possible that the temporary files were deleted 'after the >>>>>>>>> problem'? I read so, but there was no confirmation. However, it i= s a 256SSD >>>>>>>>> disk. Each of the 6 nodes has it. >>>>>>>>> >>>>>>>>> Here is the stack trace: >>>>>>>>> >>>>>>>>> 16:37:59,581 ERROR >>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask - E= rror in >>>>>>>>> task code: CHAIN Join >>>>>>>>> (org.okkam.flink.maintenance.deduplication.consolidate.Join2ToGet= Candidates) >>>>>>>>> -> Filter >>>>>>>>> (org.okkam.flink.maintenance.deduplication.match.SingleMatchFilte= rFunctionWithFlagMatch) >>>>>>>>> -> Map >>>>>>>>> (org.okkam.flink.maintenance.deduplication.match.MapToTuple3MapFu= nction) -> >>>>>>>>> Combine(org.apache.flink.api.java.operators.DistinctOperator$Dist= inctFunction) >>>>>>>>> (4/28) >>>>>>>>> java.io.IOException: The channel is erroneous. >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelAccess.checkErr= oneous(ChannelAccess.java:132) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter.wri= teBlock(BlockChannelWriter.java:73) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputVie= w.writeSegment(ChannelWriterOutputView.java:218) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputVie= w.nextSegment(ChannelWriterOutputView.java:204) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.ad= vance(AbstractPagedOutputView.java:140) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.wr= iteByte(AbstractPagedOutputView.java:223) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.wr= ite(AbstractPagedOutputView.java:173) >>>>>>>>> at >>>>>>>>> org.apache.flink.types.StringValue.writeString(StringValue.java:8= 08) >>>>>>>>> at >>>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.seria= lize(StringSerializer.java:68) >>>>>>>>> at >>>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.seria= lize(StringSerializer.java:28) >>>>>>>>> at >>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.seria= lize(TupleSerializer.java:95) >>>>>>>>> at >>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.seria= lize(TupleSerializer.java:30) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.hash.HashPartition.insertIntoP= robeBuffer(HashPartition.java:269) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.processP= robeIter(MutableHashTable.java:474) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextReco= rd(MutableHashTable.java:537) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.hash.BuildSecondHashMatchItera= tor.callWithNextKey(BuildSecondHashMatchIterator.java:106) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.ja= va:148) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPac= tTask.java:484) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(Regular= PactTask.java:359) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(Runtime= Environment.java:246) >>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>> Caused by: java.io.IOException: No space left on device >>>>>>>>> at sun.nio.ch.FileDispatcherImpl.write0(Native Method) >>>>>>>>> at >>>>>>>>> sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) >>>>>>>>> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) >>>>>>>>> at sun.nio.ch.IOUtil.write(IOUtil.java:65) >>>>>>>>> at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.wr= ite(BlockChannelAccess.java:259) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.IOManager$WriterThread= .run(IOManager.java:636) >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > --e89a8f234c039625ab0509628c2c Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
JAXB and Serialization are necessary for my business = logic. I store data as byte[] which are plain serialization of XML String. = At every read I have to rebuild the objects using jaxb.

Kryo = in Flink will allow to manage more easily user defined objects, I guess.
saluti,
Stefano



2014-12-04 12:41 GMT+01:00 Stephan Ewen= <sewen@apache.org>:
Hi Stefano!

Good to hear that it is working f= or you!

Just a heads up: Flink is not using JAXB o= r any other Java Serialization for its data exchange, only to deploy functi= ons into the cluster (which is usually very fast). When we send records aro= und, we have a special serialization stack that is absolutely competitive w= ith Kryo on serialization speed. We are thinking of using Kryo, though, to = deploy functions into the cluster in the future, to work around some of the= constraints that the java serialization has.

Gree= tings,
Stephan


O= n Thu, Dec 4, 2014 at 8:48 AM, Stefano Bortoli <s.bortoli@gmail.com&= gt; wrote:
<= div>The process was completed in about 6h45m, much less than the previous o= ne. The longest time is still taken by the 'blocking part'. I guess= we could just increase redundancy of SolrCloud indexes, and we could reach= amazing performances. Furthermore, we did not apply any 'key transform= ation' (reversing or generating Long as ID), so we have further margin = for improvements. Furthermore, I have the feeling that relying on Kryo seri= alization to build the POJOs rather than old-school JAXB marshalling/unmars= halling would also give quite a boost as we repeat the operation at least 2= 50M times. :-)

Thanks a lot to everyone. Flink is making possi= ble effective deduplication on a very heterogeneous dataset of about 10M en= tries within hours in a cluster of 6 cheap hardware nodes. :-)

saluti,
Stefano

2014-12-03 18:31 GMT+01:00 Stefano Bortoli <s.bor= toli@gmail.com>:
Hi all,

thanks for the feedback. For the mom= ent, I hope I resolved the problem by compressing the string into a bite[] = using a custom implementation of Value interface and LZ4 algorithm. I have = a little overhead on the processing of some steps, but it should reduce net= work traffic and required temporary space on disk.

I think th= e problem is due to the two joins moving around quite a bit of data. Essent= ially I join twice something like 230 million tuples with a dataset of 9.2 = million entries (~80GB). Compression seems to be working fine so far, even = though I did not reach the critical point yet. I'll keep you posted to = let you know whether this workaround solved the problem.

I app= lied a double join as an alternative to the repeat 230M*2 single gets on HB= ase. Even though this allowed to completed the process in about 11h.
thanks a lot to everyone again.

saluti,
Stefano





2014-12-03 18:02 GMT+01:00 Flavio Pomperm= aier <pompermaier@okkam.it>:
I think I can answer on behalf of Stefano that is = busy right now..the job failed because on the job manager (that is also a t= ask manager) the temp folder was full.
We would like to understand how = big should be the temp directory..which parameters should we consider to ma= ke that computation?


On Wed, Dec 3, 2014 at 5:22 PM, Ufuk Celebi <uce@apache.org<= /a>> wrote:
Th= e task managers log the temporary directories at start up. Can you have a l= ook there and verify that you configured the temporary directories correctl= y?

On Wed, Dec 3, 2014 at 5:17 PM, Stephan Ewen <sewen@apache.org>= wrote:
Hi!
<= br>
That exception means that one of the directories is full. If = you have several temp directories on different disks, you can add them all = to the config and the temp files will be rotated across the disks.

The exception may come once the first temp directory is fu= ll. For example, if you have 4 temp dirs (where 1 is rather full while the = others have a lot of space), it may be that one temp file on the full direc= tory grows large and exceeds the space, while the other directories have pl= enty of space.

Greetings,
Stephan
<= div>

On Wed, Dec 3, 2014 at 4:40 PM, Robert Metzger <rmetzger@ap= ache.org> wrote:
Hi,

I think Flink is deleting its temporary fil= es.

Is the temp. path set to the SSD on each machi= ne?
What is the size of the two data sets your are joining? Your = cluster has=C2=A06*256GB =3D 1.5 TB of temporary disk space.
Mayb= e only the temp directory of one node is full?



On Wed, Dec 3, 2014 at 3:52 PM, Ufuk Celebi <uce@apache= .org> wrote:
Hey Stefano,

I would wait for Stephan's = take on this, but with caught IOExceptions the hash table should properly c= lean up after itself and delete the file.

Can you still reprod= uce this problem for your use case?

=E2=80=93 Ufuk

=

On Tue, Dec 2, 2014 at 7:07 PM, Stefano Bortoli <s.bo= rtoli@gmail.com> wrote:
Hi guys,

a quite long process failed d= ue to this No Space Left on Device exception, but the machine disk is not f= ull at all.

okkam@okkam-nano-2:/opt/flink-0.8$ df
Filesystem=C2= =A0=C2=A0=C2=A0=C2=A0 1K-blocks=C2=A0=C2=A0=C2=A0=C2=A0 Used Available Use%= Mounted on
/dev/sdb2=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 223302236 22819504 1= 89116588=C2=A0 11% /
none=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 4=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0 4=C2=A0=C2=A0 0% /sys/fs/cgroup
udev=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 8156864=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0 4=C2=A0=C2=A0 8156860=C2=A0=C2=A0 1% /dev
tmpfs= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 1633520= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 524=C2=A0=C2=A0 1632996=C2=A0=C2=A0 1% /run<= br>none=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0 5120=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 5120=C2=A0=C2=A0 0% /run/lock
none=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 8167584= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 0=C2=A0=C2=A0 8167584=C2=A0=C2= =A0 0% /run/shm
none=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0=C2=A0 102400=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0 0=C2=A0=C2=A0=C2=A0 102400=C2=A0=C2=A0 0% /run/user
/dev/sdb1=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 523248=C2=A0=C2=A0=C2=A0=C2=A0 3= 428=C2=A0=C2=A0=C2=A0 519820=C2=A0=C2=A0 1% /boot/efi
/dev/sda1=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0 961302560=C2=A0 2218352 910229748=C2=A0=C2=A0 1% /med= ia/data
cm_processes=C2=A0=C2=A0=C2=A0=C2=A0 8167584=C2=A0=C2=A0=C2=A0 1= 2116=C2=A0=C2=A0 8155468=C2=A0=C2=A0 1% /run/cloudera-scm-agent/process
=
Is it possible that the temporary files were deleted 'after the pro= blem'? I read so, but there was no confirmation. However, it is a 256SS= D disk. Each of the 6 nodes has it.

Here is the stack trace:<= br>

16:37:59,581 ERROR org.apache.flink.runtime.operators.RegularPa= ctTask=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 - = Error in task code:=C2=A0 CHAIN Join (org.okkam.flink.maintenance.deduplica= tion.consolidate.Join2ToGetCandidates) -> Filter (org.okkam.flink.mainte= nance.deduplication.match.SingleMatchFilterFunctionWithFlagMatch) -> Map= (org.okkam.flink.maintenance.deduplication.match.MapToTuple3MapFunction) -= > Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctF= unction) (4/28)
java.io.IOException: The channel is erroneous.
=C2=A0= =C2=A0=C2=A0 at org.apache.flink.runtime.io.disk.iomanager.ChannelAccess.ch= eckErroneous(ChannelAccess.java:132)
=C2=A0=C2=A0=C2=A0 at org.apache.fl= ink.runtime.io.disk.iomanager.BlockChannelWriter.writeBlock(BlockChannelWri= ter.java:73)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.io.disk.ioma= nager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:218= )
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.io.disk.iomanager.Chann= elWriterOutputView.nextSegment(ChannelWriterOutputView.java:204)
=C2=A0= =C2=A0=C2=A0 at org.apache.flink.runtime.memorymanager.AbstractPagedOutputV= iew.advance(AbstractPagedOutputView.java:140)
=C2=A0=C2=A0=C2=A0 at org.= apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(Abstra= ctPagedOutputView.java:223)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runti= me.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java= :173)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.types.StringValue.writeStri= ng(StringValue.java:808)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.api.comm= on.typeutils.base.StringSerializer.serialize(StringSerializer.java:68)
= =C2=A0=C2=A0=C2=A0 at org.apache.flink.api.common.typeutils.base.StringSeri= alizer.serialize(StringSerializer.java:28)
=C2=A0=C2=A0=C2=A0 at org.apa= che.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSeriali= zer.java:95)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.api.java.typeutils.r= untime.TupleSerializer.serialize(TupleSerializer.java:30)
=C2=A0=C2=A0= =C2=A0 at org.apache.flink.runtime.operators.hash.HashPartition.insertIntoP= robeBuffer(HashPartition.java:269)
=C2=A0=C2=A0=C2=A0 at org.apache.flin= k.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable= .java:474)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.operators.hash= .MutableHashTable.nextRecord(MutableHashTable.java:537)
=C2=A0=C2=A0=C2= =A0 at org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator= .callWithNextKey(BuildSecondHashMatchIterator.java:106)
=C2=A0=C2=A0=C2= =A0 at org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:= 148)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.operators.RegularPac= tTask.run(RegularPactTask.java:484)
=C2=A0=C2=A0=C2=A0 at org.apache.fli= nk.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
= =C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.execution.RuntimeEnvironment= .run(RuntimeEnvironment.java:246)
=C2=A0=C2=A0=C2=A0 at java.lang.Thread= .run(Thread.java:745)
Caused by: java.io.IOException: No space left on d= evice
=C2=A0=C2=A0=C2=A0 at sun.nio.ch.FileDispatcherImpl.write0(Native = Method)
=C2=A0=C2=A0=C2=A0 at sun.nio.ch.FileDispatcherImpl.write(FileDi= spatcherImpl.java:60)
=C2=A0=C2=A0=C2=A0 at sun.nio.ch.IOUtil.writeFromN= ativeBuffer(IOUtil.java:93)
=C2=A0=C2=A0=C2=A0 at sun.nio.ch.IOUtil.writ= e(IOUtil.java:65)
=C2=A0=C2=A0=C2=A0 at sun.nio.ch.FileChannelImpl.write= (FileChannelImpl.java:205)
=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtim= e.io.disk.iomanager.SegmentWriteRequest.write(BlockChannelAccess.java:259)<= br>=C2=A0=C2=A0=C2=A0 at org.apache.flink.runtime.io.disk.iomanager.IOManag= er$WriterThread.run(IOManager.java:636)






<= /p>





--e89a8f234c039625ab0509628c2c--