flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vijay Balakrishnan <bvija...@gmail.com>
Subject Re: Flink Dashboard UI Tasks hard limit
Date Thu, 04 Jun 2020 22:05:43 GMT
Thx a ton, Xintong.
I am using this configuration now:
 taskmanager.numberOfTaskSlots: 14
    rest.server.max-content-length: 314572800
    taskmanager.network.memory.fraction: 0.45
    taskmanager.network.memory.max: 24gb
    taskmanager.network.memory.min: 500mb
    akka.ask.timeout: 240s
    cluster.evenly-spread-out-slots: true
    akka.tcp.timeout: 240s
taskmanager.network.request-backoff.initial: 5000
taskmanager.network.request-backoff.max: 30000
web.timeout:1000000

I still get an error on startup with loading the Flink jar. It resolves
itself after failing on the 1st few tries. This is
where taskmanager.network.request-backoff.initial: 5000 helped a little
bit. Would like to get this Job starting successfully on the 1st try
itself.Also attaching screenshot of error on Job failure.
Exception:
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException:
Connection for partition
ce6b601e14b959de21d8351a6c5cf70c@1f2cd0d827586a4bc7b6f40ad2609db1 not
reachable.
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
...
Caused by: java.io.IOException: Connecting the channel failed: Connecting
to remote task manager + '/10.128.49.96:43060' has failed. This might
indicate that the remote task manager has been lost.
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
... 7 more
Caused by:
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connecting to remote task manager + '/10.128.49.96:43060' has failed. This
might indicate that the remote task manager has been lost.
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
...
Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
Connection timed out: /10.128.49.96:43060
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
... 6 more
Caused by: java.net.ConnectException: Connection timed out
... 10 more

TIA,



On Sun, May 31, 2020 at 8:08 PM Xintong Song <tonysong820@gmail.com> wrote:

> Hi Vijay,
>
> The error message suggests that another task manager (10.127.106.54) is
> not responding. This could happen when the remote task manager has failed
> or under severe GC pressure. You would need to find the log of the remote
> task manager to understand what is happening.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Jun 1, 2020 at 4:57 AM Vijay Balakrishnan <bvijaykr@gmail.com>
> wrote:
>
>> Hi All,
>> The Job takes forever to startup and is now failing all the time to
>> startup.
>> Physical Memory:62.1 GB
>> JVM Heap Size:15.0 GB
>> Flink Managed Memory:10.5 GB
>> Attached a TM screenshot.
>>
>> Tried increasing the following:
>>
>> taskmanager.numberOfTaskSlots: 10
>> parallelism.default: 1
>> rest.server.max-content-length: 314572800
>> taskmanager.network.memory.fraction: 0.45
>> taskmanager.network.memory.max: 24gb
>> taskmanager.network.memory.min: 500mb
>> akka.ask.timeout: 240s
>> cluster.evenly-spread-out-slots: true
>> taskmanager.network.netty.client.connectTimeoutSec: 240
>> taskmanager.network.detailed-metrics: true
>> taskmanager.network.memory.floating-buffers-per-gate: 16
>> akka.tcp.timeout: 30s
>>
>> There are more than enough slots. Issue seems to be communicating over
>> TCP with Remote Task managers ??
>>
>> Getting this exception on a TaskManager:
>>
>> 2020-05-31 20:37:31,436 INFO  org.apache.flink.runtime.taskmanager.Task
>>                   - Window(TumblingEventTimeWindows(5000),
>> EventTimeTrigger, MGroupingWindowAggregate,
>> MGroupingAggregateWindowProcessing) (36/440)
>> (921fe6761ce844a6850c5fc67326b221) switched from DEPLOYING to FAILED.
>> org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException:
>> Connection for partition
>> faea47916a206dc8d014694ec72ab577@95f71b39868d4e23a180ce11653dc4ca not
>> reachable.
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
>> at
>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
>> at
>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> --
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
>> at
>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
>> at
>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: Connecting the channel failed: Connecting
>> to remote task manager + '/10.127.106.54:33564' has failed. This might
>> indicate that the remote task manager has been lost.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
>> ... 7 more
>> --
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
>> ... 7 more
>> Caused by:
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connecting to remote task manager + '/10.127.106.54:33564' has failed.
>> This might indicate that the remote task manager has been lost.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>> ... 1 more
>> --
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>> ... 1 more
>> Caused by:
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>> Connection timed out: /10.127.106.54:33564
>>
>>
>> On Fri, May 29, 2020 at 12:43 PM Vijay Balakrishnan <bvijaykr@gmail.com>
>> wrote:
>>
>>> Thx, Xintong for the detailed explanation of memory fraction. I
>>> increased the mem fraction now.
>>>
>>> As I increase the defaultParallelism, I keep getting this error:
>>>
>>> org.apache.flink.runtime.io.network.partition.consumer.
>>> PartitionConnectionException: Connection for partition
>>> e312b2db4d1d0c65224664f620d06f7d@c3a4fb1e56a0996d9f2ff86dac6e483f not
>>> reachable.
>>>     at org.apache.flink.runtime.io.network.partition.consumer.
>>> RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>>>     at org.apache.flink.runtime.io.network.partition.consumer.
>>> SingleInputGate.requestPartitions(SingleInputGate.java:237)
>>>     at org.apache.flink.runtime.io.network.partition.consumer.
>>> SingleInputGate.setup(SingleInputGate.java:215)
>>>     at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(
>>> InputGateWithMetrics.java:65)
>>>     at org.apache.flink.runtime.taskmanager.Task
>>> .setupPartitionsAndGates(Task.java:866)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.io.IOException: Connecting the channel failed:
>>> Connecting to remote task manager + '/10.9.239.218:45544' has failed.
>>> This might indicate that the remote task manager has been lost.
>>>     at org.apache.flink.runtime.io.network.netty.
>>> PartitionRequestClientFactory$ConnectingChannel.waitForChannel(
>>> PartitionRequestClientFactory.java:197)
>>>     at org.apache.flink.runtime.io.network.netty.
>>> PartitionRequestClientFactory$ConnectingChannel.access$000(
>>> PartitionRequestClientFactory.java:134)
>>>     at org.apache.flink.runtime.io.network.netty.
>>> PartitionRequestClientFactory.createPartitionRequestClient(
>>> PartitionRequestClientFactory.java:70)
>>>     at org.apache.flink.runtime.io.network.netty.NettyConnectionManager
>>> .createPartitionRequestClient(NettyConnectionManager.java:68)
>>>     at org.apache.flink.runtime.io.network.partition.consumer.
>>> RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
>>>     ... 7 more
>>> Caused by: org.apache.flink.runtime.io.network.netty.exception.
>>> RemoteTransportException: Connecting to remote task manager + '/
>>> 10.9.239.218:45544' has failed. This might indicate that the remote
>>> task manager has been lost.
>>>     at org.apache.flink.runtime.io.network.netty.
>>> PartitionRequestClientFactory$ConnectingChannel.operationComplete(
>>> PartitionRequestClientFactory.java:220)
>>>     at org.apache.flink.runtime.io.network.netty.
>>> PartitionRequestClientFactory$ConnectingChannel.operationComplete(
>>> PartitionRequestClientFactory.java:134)
>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> DefaultPromise.notifyListener0(DefaultPromise.java:511)
>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> DefaultPromise.notifyListeners(DefaultPromise.java:424)
>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> DefaultPromise.tryFailure(DefaultPromise.java:121)
>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.
>>> AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(
>>> AbstractNioChannel.java:327)
>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.
>>> AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel
>>> .java:343)
>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>>> .processSelectedKey(NioEventLoop.java:644)
>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>>> .processSelectedKeysOptimized(NioEventLoop.java:591)
>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>>> .processSelectedKeys(NioEventLoop.java:508)
>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>>> .run(NioEventLoop.java:470)
>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>>>     ... 1 more
>>> Caused by: org.apache.flink.shaded.netty4.io.netty.channel.
>>> AbstractChannel$AnnotatedConnectException: Connection timed out: /10.9.
>>> 239.218:45544
>>>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>>     at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
>>> .java:714)
>>>     at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.
>>> NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.
>>> AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel
>>> .java:340)
>>>     ... 6 more
>>> Caused by: java.net.ConnectException: Connection timed out
>>>     ... 10 more
>>>
>>>
>>> On Wed, May 27, 2020 at 7:14 PM Xintong Song <tonysong820@gmail.com>
>>> wrote:
>>>
>>>> Ah, I guess I had misunderstood what your mean.
>>>>
>>>> Below 18000 tasks, the Flink Job is able to start up.
>>>>> Even though I increased the number of slots, it still works when 312
>>>>> slots are being used.
>>>>>
>>>> When you say "it still works", I thought that you increased the
>>>> parallelism the job was sill executed as the parallelism was not increased.
>>>> From your latest reply, it seems the job's parallelism is indeed
>>>> increased, but then it runs into failures.
>>>>
>>>> The reason you run into the "Insufficient number of network buffers"
>>>> exception, is that with more tasks in your job, more inter-task data
>>>> transmission channels, thus memory for network buffers, are needed.
>>>>
>>>> To increase the network memory size, the following configuration
>>>> options, as you already found, are related.
>>>>
>>>>    - taskmanager.network.memory.fraction
>>>>    - taskmanager.network.memory.max
>>>>    - taskmanager.network.memory.min
>>>>
>>>> Please be aware that `taskmanager.memory.task.off-heap.size` is not
>>>> related to network memory, and is only available in Flink 1.10 and above
>>>> while you're using 1.9.1 as suggested by the screenshots.
>>>>
>>>> The network memory size is calculated as `min(max(some_total_value *
>>>> network_fraction, network_min), network_max)`. According to the error
>>>> message, your current network memory size is `85922 buffers *
>>>> 32KB/buffer = 2685MB`, smaller than your "max" (4gb). That means
>>>> increasing the "max" does not help in your case. It is the "fraction" that
>>>> you need to increase.
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>>
>>>> On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan <bvijaykr@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Xintong,
>>>>> Looks like the issue is not fully resolved :( Attaching 2 screenshots
>>>>> of the memory consumption of 1 of the TaskManagers.
>>>>>
>>>>> To increase the used up Direct memory off heap,Do I change this:
>>>>>  taskmanager.memory.task.off-heap.size: 5gb
>>>>>
>>>>> I had increased the taskmanager.network.memory.max: 24gb
>>>>> which seems excessive.
>>>>>
>>>>> 1 of the errors I saw in the Flink logs:
>>>>>
>>>>> java.io.IOException: Insufficient number of network buffers: required
>>>>> 1, but only 0 available. The total number of network buffers is currently
>>>>> set to 85922 of 32768 bytes each. You can increase this number by setting
>>>>> the configuration keys 'taskmanager.network.memory.fraction',
>>>>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>>>>> at
>>>>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191)
>>>>>
>>>>> TIA,
>>>>>
>>>>>
>>>>> On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan <bvijaykr@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks so much, Xintong for guiding me through this. I looked at
the
>>>>>> Flink logs to see the errors.
>>>>>> I had to change taskmanager.network.memory.max: 4gb
>>>>>> and akka.ask.timeout: 240s to increase the number of tasks.
>>>>>> Now, I am able to increase the number of Tasks/ aka Task vertices.
>>>>>>
>>>>>> taskmanager.network.memory.fraction: 0.15
>>>>>> taskmanager.network.memory.max: 4gb
>>>>>> taskmanager.network.memory.min: 500mb
>>>>>> akka.ask.timeout: 240s
>>>>>>
>>>>>> On Tue, May 26, 2020 at 8:42 PM Xintong Song <tonysong820@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Could you also explain how do you set the parallelism when getting
>>>>>>> this execution plan?
>>>>>>> I'm asking because this json file itself only shows the resulted
>>>>>>> execution plan. It is not clear to me what is not working as
expected in
>>>>>>> your case. E.g., you set the parallelism for an operator to 10
but the
>>>>>>> execution plan only shows 5.
>>>>>>>
>>>>>>> Thank you~
>>>>>>>
>>>>>>> Xintong Song
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <
>>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Xintong,
>>>>>>>> Thanks for the excellent clarification for tasks.
>>>>>>>>
>>>>>>>> I attached a sample screenshot above and din't reflect the
slots
>>>>>>>> used and the tasks limit I was running into in that pic.
>>>>>>>>
>>>>>>>> I am attaching my Execution plan here. Please let me know
how I can
>>>>>>>> increase the nmber of tasks aka parallelism. As  increase
the parallelism,
>>>>>>>> i run into this bottleneck with the tasks.
>>>>>>>>
>>>>>>>> BTW - The https://flink.apache.org/visualizer/ is a great
start to
>>>>>>>> see this.
>>>>>>>> TIA,
>>>>>>>>
>>>>>>>> On Sun, May 24, 2020 at 7:52 PM Xintong Song <tonysong820@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Increasing network memory buffers (fraction, min, max)
seems to
>>>>>>>>>> increase tasks slightly.
>>>>>>>>>
>>>>>>>>> That's wired. I don't think the number of network memory
buffers
>>>>>>>>> have anything to do with the task amount.
>>>>>>>>>
>>>>>>>>> Let me try to clarify a few things.
>>>>>>>>>
>>>>>>>>> Please be aware that, how many tasks a Flink job has,
and how many
>>>>>>>>> slots a Flink cluster has, are two different things.
>>>>>>>>> - The number of tasks are decided by your job's parallelism
and
>>>>>>>>> topology. E.g., if your job graph have 3 vertices A,
B and C, with
>>>>>>>>> parallelism 2, 3, 4 respectively. Then you would have
totally 9 (2+3+4)
>>>>>>>>> tasks.
>>>>>>>>> - The number of slots are decided by number of TMs and
>>>>>>>>> slots-per-TM.
>>>>>>>>> - For streaming jobs, you have to make sure the number
of slots is
>>>>>>>>> enough for executing all your tasks. The number of slots
needed for
>>>>>>>>> executing your job is by default the max parallelism
of your job graph
>>>>>>>>> vertices. Take the above example, you would need 4 slots,
because it's the
>>>>>>>>> max among all the vertices' parallelisms (2, 3, 4).
>>>>>>>>>
>>>>>>>>> In your case, the screenshot shows that you job has 9621
tasks in
>>>>>>>>> total (not around 18000, the dark box shows total tasks
while the green box
>>>>>>>>> shows running tasks), and 600 slots are in use (658 -
58) suggesting that
>>>>>>>>> the max parallelism of your job graph vertices is 600.
>>>>>>>>>
>>>>>>>>> If you want to increase the number of tasks, you should
increase
>>>>>>>>> your job parallelism. There are several ways to do that.
>>>>>>>>>
>>>>>>>>>    - In your job codes (assuming you are using DataStream
API)
>>>>>>>>>       - Use `StreamExecutionEnvironment#setParallelism()`
to set
>>>>>>>>>       parallelism for all operators.
>>>>>>>>>       - Use `SingleOutputStreamOperator#setParallelism()`
to set
>>>>>>>>>       parallelism for a specific operator. (Only supported
for subclasses of
>>>>>>>>>       `SingleOutputStreamOperator`.)
>>>>>>>>>    - When submitting your job, use `-p <parallelism>`
as an
>>>>>>>>>    argument for the `flink run` command, to set parallelism
for all operators.
>>>>>>>>>    - Set `parallelism.default` in your `flink-conf.yaml`,
to set
>>>>>>>>>    a default parallelism for your jobs. This will be
used for jobs that have
>>>>>>>>>    not set parallelism with neither of the above methods.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thank you~
>>>>>>>>>
>>>>>>>>> Xintong Song
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <
>>>>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Xintong,
>>>>>>>>>> Thx for your reply.  Increasing network memory buffers
>>>>>>>>>> (fraction, min, max) seems to increase tasks slightly.
>>>>>>>>>>
>>>>>>>>>> Streaming job
>>>>>>>>>> Standalone
>>>>>>>>>>
>>>>>>>>>> Vijay
>>>>>>>>>>
>>>>>>>>>> On Fri, May 22, 2020 at 2:49 AM Xintong Song <
>>>>>>>>>> tonysong820@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Vijay,
>>>>>>>>>>>
>>>>>>>>>>> I don't think your problem is related to number
of opening
>>>>>>>>>>> files. The parallelism of your job is decided
before actually tries to open
>>>>>>>>>>> the files. And if the OS limit for opening files
is reached, you should see
>>>>>>>>>>> a job execution failure, instead of a success
execution with a lower
>>>>>>>>>>> parallelism.
>>>>>>>>>>>
>>>>>>>>>>> Could you share some more information about your
use case?
>>>>>>>>>>>
>>>>>>>>>>>    - What kind of job are your executing? Is
it a streaming or
>>>>>>>>>>>    batch processing job?
>>>>>>>>>>>    - Which Flink deployment do you use? Standalone?
Yarn?
>>>>>>>>>>>    - It would be helpful if you can share the
Flink logs.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thank you~
>>>>>>>>>>>
>>>>>>>>>>> Xintong Song
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan
<
>>>>>>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> I have increased the number of slots available
but the Job is
>>>>>>>>>>>> not using all the slots but runs into this
approximate 18000 Tasks limit.
>>>>>>>>>>>> Looking into the source code, it seems to
be opening file -
>>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
>>>>>>>>>>>> So, do I have to tune the ulimit or something
similar at the
>>>>>>>>>>>> Ubuntu O/S level to increase number of tasks
available ? What I am confused
>>>>>>>>>>>> about is the ulimit is per machine but the
ExecutionGraph is across many
>>>>>>>>>>>> machines ? Please pardon my ignorance here.
Does number of tasks equate to
>>>>>>>>>>>> number of open files. I am using 15 slots
per TaskManager on AWS m5.4xlarge
>>>>>>>>>>>> which has 16 vCPUs.
>>>>>>>>>>>>
>>>>>>>>>>>> TIA.
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan
<
>>>>>>>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Flink Dashboard UI seems to show tasks
having a hard limit for
>>>>>>>>>>>>> Tasks column around 18000 on a Ubuntu
Linux box.
>>>>>>>>>>>>> I kept increasing the number of slots
per task manager to 15
>>>>>>>>>>>>> and number of slots increased to 705
but the slots to tasks
>>>>>>>>>>>>> stayed at around 18000. Below 18000 tasks,
the Flink Job is
>>>>>>>>>>>>> able to start up.
>>>>>>>>>>>>> Even though I increased the number of
slots, it still works
>>>>>>>>>>>>> when 312 slots are being used.
>>>>>>>>>>>>>
>>>>>>>>>>>>> taskmanager.numberOfTaskSlots: 15
>>>>>>>>>>>>>
>>>>>>>>>>>>> What knob can I tune to increase the
number of Tasks ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Pls find attached the Flink Dashboard
UI.
>>>>>>>>>>>>>
>>>>>>>>>>>>> TIA,
>>>>>>>>>>>>>
>>>>>>>>>>>>>

Mime
View raw message