flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefano Bortoli <s.bort...@gmail.com>
Subject Re: No Space Left on Device
Date Thu, 04 Dec 2014 11:48:53 GMT
JAXB and Serialization are necessary for my business logic. I store data as
byte[] which are plain serialization of XML String. At every read I have to
rebuild the objects using jaxb.

Kryo in Flink will allow to manage more easily user defined objects, I
guess.

saluti,
Stefano



2014-12-04 12:41 GMT+01:00 Stephan Ewen <sewen@apache.org>:

> Hi Stefano!
>
> Good to hear that it is working for you!
>
> Just a heads up: Flink is not using JAXB or any other Java Serialization
> for its data exchange, only to deploy functions into the cluster (which is
> usually very fast). When we send records around, we have a special
> serialization stack that is absolutely competitive with Kryo on
> serialization speed. We are thinking of using Kryo, though, to deploy
> functions into the cluster in the future, to work around some of the
> constraints that the java serialization has.
>
> Greetings,
> Stephan
>
>
> On Thu, Dec 4, 2014 at 8:48 AM, Stefano Bortoli <s.bortoli@gmail.com>
> wrote:
>
>> The process was completed in about 6h45m, much less than the previous
>> one. The longest time is still taken by the 'blocking part'. I guess we
>> could just increase redundancy of SolrCloud indexes, and we could reach
>> amazing performances. Furthermore, we did not apply any 'key
>> transformation' (reversing or generating Long as ID), so we have further
>> margin for improvements. Furthermore, I have the feeling that relying on
>> Kryo serialization to build the POJOs rather than old-school JAXB
>> marshalling/unmarshalling would also give quite a boost as we repeat the
>> operation at least 250M times. :-)
>>
>> Thanks a lot to everyone. Flink is making possible effective
>> deduplication on a very heterogeneous dataset of about 10M entries within
>> hours in a cluster of 6 cheap hardware nodes. :-)
>>
>> saluti,
>> Stefano
>>
>> 2014-12-03 18:31 GMT+01:00 Stefano Bortoli <s.bortoli@gmail.com>:
>>
>>> Hi all,
>>>
>>> thanks for the feedback. For the moment, I hope I resolved the problem
>>> by compressing the string into a bite[] using a custom implementation of
>>> Value interface and LZ4 algorithm. I have a little overhead on the
>>> processing of some steps, but it should reduce network traffic and required
>>> temporary space on disk.
>>>
>>> I think the problem is due to the two joins moving around quite a bit of
>>> data. Essentially I join twice something like 230 million tuples with a
>>> dataset of 9.2 million entries (~80GB). Compression seems to be working
>>> fine so far, even though I did not reach the critical point yet. I'll keep
>>> you posted to let you know whether this workaround solved the problem.
>>>
>>> I applied a double join as an alternative to the repeat 230M*2 single
>>> gets on HBase. Even though this allowed to completed the process in about
>>> 11h.
>>>
>>> thanks a lot to everyone again.
>>>
>>> saluti,
>>> Stefano
>>>
>>>
>>>
>>>
>>>
>>>
>>> 2014-12-03 18:02 GMT+01:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>
>>>> I think I can answer on behalf of Stefano that is busy right now..the
>>>> job failed because on the job manager (that is also a task manager) the
>>>> temp folder was full.
>>>> We would like to understand how big should be the temp directory..which
>>>> parameters should we consider to make that computation?
>>>>
>>>>
>>>> On Wed, Dec 3, 2014 at 5:22 PM, Ufuk Celebi <uce@apache.org> wrote:
>>>>
>>>>> The task managers log the temporary directories at start up. Can you
>>>>> have a look there and verify that you configured the temporary directories
>>>>> correctly?
>>>>>
>>>>> On Wed, Dec 3, 2014 at 5:17 PM, Stephan Ewen <sewen@apache.org>
wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> That exception means that one of the directories is full. If you
have
>>>>>> several temp directories on different disks, you can add them all
to the
>>>>>> config and the temp files will be rotated across the disks.
>>>>>>
>>>>>> The exception may come once the first temp directory is full. For
>>>>>> example, if you have 4 temp dirs (where 1 is rather full while the
others
>>>>>> have a lot of space), it may be that one temp file on the full directory
>>>>>> grows large and exceeds the space, while the other directories have
plenty
>>>>>> of space.
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 3, 2014 at 4:40 PM, Robert Metzger <rmetzger@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I think Flink is deleting its temporary files.
>>>>>>>
>>>>>>> Is the temp. path set to the SSD on each machine?
>>>>>>> What is the size of the two data sets your are joining? Your
cluster
>>>>>>> has 6*256GB = 1.5 TB of temporary disk space.
>>>>>>> Maybe only the temp directory of one node is full?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Dec 3, 2014 at 3:52 PM, Ufuk Celebi <uce@apache.org>
wrote:
>>>>>>>
>>>>>>>> Hey Stefano,
>>>>>>>>
>>>>>>>> I would wait for Stephan's take on this, but with caught
>>>>>>>> IOExceptions the hash table should properly clean up after
itself and
>>>>>>>> delete the file.
>>>>>>>>
>>>>>>>> Can you still reproduce this problem for your use case?
>>>>>>>>
>>>>>>>> – Ufuk
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Dec 2, 2014 at 7:07 PM, Stefano Bortoli <
>>>>>>>> s.bortoli@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi guys,
>>>>>>>>>
>>>>>>>>> a quite long process failed due to this No Space Left
on Device
>>>>>>>>> exception, but the machine disk is not full at all.
>>>>>>>>>
>>>>>>>>> okkam@okkam-nano-2:/opt/flink-0.8$ df
>>>>>>>>> Filesystem     1K-blocks     Used Available Use% Mounted
on
>>>>>>>>> /dev/sdb2      223302236 22819504 189116588  11% /
>>>>>>>>> none                   4        0         4   0% /sys/fs/cgroup
>>>>>>>>> udev             8156864        4   8156860   1% /dev
>>>>>>>>> tmpfs            1633520      524   1632996   1% /run
>>>>>>>>> none                5120        0      5120   0% /run/lock
>>>>>>>>> none             8167584        0   8167584   0% /run/shm
>>>>>>>>> none              102400        0    102400   0% /run/user
>>>>>>>>> /dev/sdb1         523248     3428    519820   1% /boot/efi
>>>>>>>>> /dev/sda1      961302560  2218352 910229748   1% /media/data
>>>>>>>>> cm_processes     8167584    12116   8155468   1%
>>>>>>>>> /run/cloudera-scm-agent/process
>>>>>>>>>
>>>>>>>>> Is it possible that the temporary files were deleted
'after the
>>>>>>>>> problem'? I read so, but there was no confirmation. However,
it is a 256SSD
>>>>>>>>> disk. Each of the 6 nodes has it.
>>>>>>>>>
>>>>>>>>> Here is the stack trace:
>>>>>>>>>
>>>>>>>>> 16:37:59,581 ERROR
>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask  
         - Error in
>>>>>>>>> task code:  CHAIN Join
>>>>>>>>> (org.okkam.flink.maintenance.deduplication.consolidate.Join2ToGetCandidates)
>>>>>>>>> -> Filter
>>>>>>>>> (org.okkam.flink.maintenance.deduplication.match.SingleMatchFilterFunctionWithFlagMatch)
>>>>>>>>> -> Map
>>>>>>>>> (org.okkam.flink.maintenance.deduplication.match.MapToTuple3MapFunction)
->
>>>>>>>>> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
>>>>>>>>> (4/28)
>>>>>>>>> java.io.IOException: The channel is erroneous.
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelAccess.checkErroneous(ChannelAccess.java:132)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter.writeBlock(BlockChannelWriter.java:73)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:218)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:204)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.types.StringValue.writeString(StringValue.java:808)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:68)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:95)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.operators.hash.HashPartition.insertIntoProbeBuffer(HashPartition.java:269)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:474)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:537)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator.callWithNextKey(BuildSecondHashMatchIterator.java:106)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:246)
>>>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>>>> Caused by: java.io.IOException: No space left on device
>>>>>>>>>     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>>>>>>>>     at
>>>>>>>>> sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
>>>>>>>>>     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>>>>>>>>     at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>>>>>>>>>     at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(BlockChannelAccess.java:259)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.IOManager$WriterThread.run(IOManager.java:636)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message