flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: "java.net.SocketException: No buffer space available (maximum connections reached?)" when reading from HDFS
Date Mon, 17 Oct 2016 12:00:37 GMT
Happy to hear it!

On Mon, Oct 17, 2016 at 9:31 AM, Yassine MARZOUGUI <
y.marzougui@mindlytix.com> wrote:

> That solved my problem, Thank you!
>
> Best,
> Yassine
>
> 2016-10-16 19:18 GMT+02:00 Stephan Ewen <sewen@apache.org>:
>
>> Hi!
>>
>> Looks to me that this is the following problem: The Decompression Streams
>> did not properly forward the "close()" calls.
>>
>> It is in the lastest 1.2-SNAPSHOT, but did not make it into version
>> 1.1.3.
>> The fix is in that pull request: https://github.com/apache/flin
>> k/pull/2581
>>
>> I have pushed the fix into the latest 1.1-SNAPSHOT branch.
>>
>> If you get the code via "git clone -b release-1.1
>> https://github.com/apache/flink.git" you will get the code that is the
>> same as the 1.1.3 release, plus the patch to this problem.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Sat, Oct 15, 2016 at 10:11 PM, Yassine MARZOUGUI <
>> y.marzougui@mindlytix.com> wrote:
>>
>>> Hi all,
>>>
>>> I'm reading a large number of small files from HDFS in batch mode (about
>>> 20 directories, each directory contains about 3000 files, using
>>> recursive.file.enumeration=true), and each time, at about 200 GB of
>>> received data, my job fails with the following exception:
>>>
>>> java.io.IOException: Error opening the Input Split
>>> hdfs:///filepath/filename.csv.gz [0,-1]: Could not obtain block:
>>> BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
>>> file=/filepath/filename.csv.gz
>>>         at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>>> tFormat.java:693)
>>>         at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>>> imitedInputFormat.java:424)
>>>         at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>>> imitedInputFormat.java:47)
>>>         at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat
>>> aSourceTask.java:140)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>         at java.lang.Thread.run(Unknown Source)
>>> Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not
>>> obtain block: BP-812793611-127.0.0.1-1455882335652:blk_1075977174_2237313
>>> file=/filepath/filename.csv.gz
>>>         at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInpu
>>> tStream.java:984)
>>>         at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt
>>> ream.java:642)
>>>         at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn
>>> putStream.java:882)
>>>         at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.ja
>>> va:934)
>>>         at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.ja
>>> va:735)
>>>         at java.io.FilterInputStream.read(Unknown Source)
>>>         at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
>>> HadoopDataInputStream.java:59)
>>>         at java.util.zip.CheckedInputStream.read(Unknown Source)
>>>         at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
>>>         at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
>>>         at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
>>>         at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>>>         at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>>>         at org.apache.flink.api.common.io.compression.GzipInflaterInput
>>> StreamFactory.create(GzipInflaterInputStreamFactory.java:44)
>>>         at org.apache.flink.api.common.io.compression.GzipInflaterInput
>>> StreamFactory.create(GzipInflaterInputStreamFactory.java:31)
>>>         at org.apache.flink.api.common.io.FileInputFormat.decorateInput
>>> Stream(FileInputFormat.java:717)
>>>         at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>>> tFormat.java:689)
>>>         ... 5 more
>>>
>>> I checked the file each time and it exists and is healthy. Looking at
>>> the taskmanager logs, I found the following exceptions which suggests it is
>>> running out of connections:
>>>
>>> 2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.BlockReaderFactory
>>>                     - I/O error constructing remote block reader.
>>> java.net.SocketException: No buffer space available (maximum connections
>>> reached?): connect
>>> at sun.nio.ch.Net.connect0(Native Method)
>>> at sun.nio.ch.Net.connect(Unknown Source)
>>> at sun.nio.ch.Net.connect(Unknown Source)
>>> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
>>> at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWi
>>> thTimeout.java:192)
>>> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
>>> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.
>>> java:3436)
>>> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockR
>>> eaderFactory.java:777)
>>> at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockRead
>>> erFromTcp(BlockReaderFactory.java:694)
>>> at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderF
>>> actory.java:355)
>>> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt
>>> ream.java:673)
>>> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn
>>> putStream.java:882)
>>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
>>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
>>> at java.io.FilterInputStream.read(Unknown Source)
>>> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
>>> HadoopDataInputStream.java:59)
>>> at java.util.zip.CheckedInputStream.read(Unknown Source)
>>> at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
>>> at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
>>> at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
>>> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>>> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>>> StreamFactory.create(GzipInflaterInputStreamFactory.java:44)
>>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>>> StreamFactory.create(GzipInflaterInputStreamFactory.java:31)
>>> at org.apache.flink.api.common.io.FileInputFormat.decorateInput
>>> Stream(FileInputFormat.java:717)
>>> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>>> tFormat.java:689)
>>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>>> imitedInputFormat.java:424)
>>> at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputForma
>>> t.java:99)
>>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>>> imitedInputFormat.java:47)
>>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat
>>> aSourceTask.java:140)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>> at java.lang.Thread.run(Unknown Source)
>>> 2016-10-15 18:20:27,034 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                      - Failed to connect to /x.x.x.x:50010 for block, add
>>> to deadNodes and continue. java.net.SocketException: No buffer space
>>> available (maximum connections reached?): connect
>>> java.net.SocketException: No buffer space available (maximum connections
>>> reached?): connect
>>> at sun.nio.ch.Net.connect0(Native Method)
>>> at sun.nio.ch.Net.connect(Unknown Source)
>>> at sun.nio.ch.Net.connect(Unknown Source)
>>> at sun.nio.ch.SocketChannelImpl.connect(Unknown Source)
>>> at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWi
>>> thTimeout.java:192)
>>> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
>>> at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.
>>> java:3436)
>>> at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockR
>>> eaderFactory.java:777)
>>> at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockRead
>>> erFromTcp(BlockReaderFactory.java:694)
>>> at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderF
>>> actory.java:355)
>>> at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputSt
>>> ream.java:673)
>>> at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSIn
>>> putStream.java:882)
>>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
>>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:735)
>>> at java.io.FilterInputStream.read(Unknown Source)
>>> at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(
>>> HadoopDataInputStream.java:59)
>>> at java.util.zip.CheckedInputStream.read(Unknown Source)
>>> at java.util.zip.GZIPInputStream.readUByte(Unknown Source)
>>> at java.util.zip.GZIPInputStream.readUShort(Unknown Source)
>>> at java.util.zip.GZIPInputStream.readHeader(Unknown Source)
>>> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>>> at java.util.zip.GZIPInputStream.<init>(Unknown Source)
>>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>>> StreamFactory.create(GzipInflaterInputStreamFactory.java:44)
>>> at org.apache.flink.api.common.io.compression.GzipInflaterInput
>>> StreamFactory.create(GzipInflaterInputStreamFactory.java:31)
>>> at org.apache.flink.api.common.io.FileInputFormat.decorateInput
>>> Stream(FileInputFormat.java:717)
>>> at org.apache.flink.api.common.io.FileInputFormat.open(FileInpu
>>> tFormat.java:689)
>>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>>> imitedInputFormat.java:424)
>>> at org.myorg.quickstart.MyTextInputFormat.open(MyTextInputForma
>>> t.java:99)
>>> at org.apache.flink.api.common.io.DelimitedInputFormat.open(Del
>>> imitedInputFormat.java:47)
>>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat
>>> aSourceTask.java:140)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>> at java.lang.Thread.run(Unknown Source)
>>>
>>> I inspected the open connections, and found that a very large number of
>>> connections are opened by the job and stuck on the CLOSE_WAIT status, which
>>> I guess exhausted the ephemeral port space after some time.
>>> I'm running Flink 1.1.2 on Windows 10 (1 node with 1 TaskManager), and
>>> using a prallelism of 8. I got the same exception even with a job
>>> paralellism set to 1. The same exception happened after upgrading to Flink
>>> 1.1.3 too.
>>>
>>> Any idea what could be the root cause of the problem and how to solve it?
>>> Thank you.
>>>
>>> Best,
>>> Yassine
>>>
>>
>>
>

Mime
View raw message