flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeff Henrikson <jehenri...@gmail.com>
Subject Trouble with large state
Date Wed, 17 Jun 2020 17:46:54 GMT
Hello Flink users,

I have an application of around 10 enrichment joins.  All events are 
read from kafka and have event timestamps.  The joins are built using 
.cogroup, with a global window, triggering on every 1 event, plus a 
custom evictor that drops records once a newer record for the same ID 
has been processed.  Deletes are represented by empty events with 
timestamp and ID (tombstones). That way, we can drop records when 
business logic dictates, as opposed to when a maximum retention has been 
attained.  The application runs RocksDBStateBackend, on Kubernetes on 
AWS with local SSDs.

Unit tests show that the joins produce expected results.  On an 8 node 
cluster, watermark output progress seems to indicate I should be able to 
bootstrap my state of around 500GB in around 1 day.  I am able to save 
and restore savepoints for the first half an hour of run time.

My current trouble is that after around 50GB of state, I stop being able 
to reliably take checkpoints or savepoints.  Some time after that, I 
start getting a variety of failures where the first suspicious log event 
is a generic cluster connectivity error, such as:

     1) java.io.IOException: Connecting the channel failed: Connecting
     to remote task manager + '/' has failed. This
     might indicate that the remote task manager has been lost.

     2) org.apache.flink.runtime.io.network.netty.exception
     .RemoteTransportException: Connection unexpectedly closed by remote
     task manager 'null'. This might indicate that the remote task
     manager was lost.

     3) Association with remote system
     [akka.tcp://flink@] has failed, address is now
     gated for [50] ms. Reason: [Association failed with
     [akka.tcp://flink@]] Caused by:
     [java.net.NoRouteToHostException: No route to host]

I don't see any obvious out of memory errors on the TaskManager UI.

Adding nodes to the cluster does not seem to increase the maximum 
savable state size.

I could enable HA, but for the time being I have been leaving it out to 
avoid the possibility of masking deterministic faults.

Below are my configurations.

Thanks in advance for any advice.


Jeff Henrikson

Flink version: 1.10

Configuration set via code:
     setDbStoragePath points to a local nvme SSD

Configuration in flink-conf.yaml:

     jobmanager.rpc.address: localhost
     jobmanager.rpc.port: 6123
     jobmanager.heap.size: 28000m
     taskmanager.memory.process.size: 28000m
     taskmanager.memory.jvm-metaspace.size: 512m
     taskmanager.numberOfTaskSlots: 1
     parallelism.default: 1
     jobmanager.execution.failover-strategy: full

     cluster.evenly-spread-out-slots: false

     taskmanager.memory.network.fraction: 0.2           # default 0.1
     taskmanager.memory.framework.off-heap.size: 2GB
     taskmanager.memory.task.off-heap.size: 2GB
     taskmanager.network.memory.buffers-per-channel: 32 # default 2
     taskmanager.memory.managed.fraction: 0.4           # docs say 
default 0.1, but something seems to set 0.4
     taskmanager.memory.task.off-heap.size: 2048MB      # default 128M

     state.backend.fs.memory-threshold: 1048576
     state.backend.fs.write-buffer-size: 10240000
     state.backend.local-recovery: true
     state.backend.rocksdb.writebuffer.size: 64MB
     state.backend.rocksdb.writebuffer.count: 8
     state.backend.rocksdb.writebuffer.number-to-merge: 4
     state.backend.rocksdb.timer-service.factory: heap
     state.backend.rocksdb.block.cache-size: 64000000 # default 8MB
     state.backend.rocksdb.write-batch-size: 16000000 # default 2MB

     web.checkpoints.history: 250

View raw message