flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Scott Kidder (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4536) Possible thread leak in Task Manager
Date Thu, 01 Sep 2016 20:12:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456481#comment-15456481
] 

Scott Kidder commented on FLINK-4536:
-------------------------------------

On second thought, I'll leave the issue open and allow the Flink team to decide whether to
update the version of the AWS SDK and KCL referenced in the Kinesis connector pom.xml.

> Possible thread leak in Task Manager
> ------------------------------------
>
>                 Key: FLINK-4536
>                 URL: https://issues.apache.org/jira/browse/FLINK-4536
>             Project: Flink
>          Issue Type: Bug
>          Components: TaskManager
>    Affects Versions: 1.1.0
>            Reporter: Scott Kidder
>
> Running Flink release 1.1.1 commit 61bfb36 in the following configuration:
> Job Manager
> 2 x Task Manager (2 CPU cores on each Task Manager)
> I've also updated the Kinesis source to use the latest AWS Java SDK, release 1.11.29.
> I've got a single Flink application using all 4 slots. It consumes from a Kinesis stream
configured with 2 shards. I've limited the Kinesis source to a parallelism of 2 as a workaround
for FLINK-4341.
> Occasionally the Kinesis consumer fails because of provisioned-throughput limits being
hit. The application automatically restarts, and resumes processing with the checkpoint stored
on the Job Manager with no outward indication of problems.
> I recently enabled the StatsD metrics reporter in Flink and noticed that the number of
threads running on each Task Manager increases by about 20 threads each time the application
restarts. Over the course of a day the application might hit provisioned-throughput limits
20 times or so (this is not fully production yet, so hitting these limits is acceptable for
now). But the number of threads continues to grow unbounded with no increase in workload on
the Task Managers.
> The following link includes charts for the overall Flink cluster performance & Task
Manager JVM threads over the course of 12 hours:
> http://imgur.com/a/K59hz
> Each decrease and subsequent spike in threads corresponds to the job being restarted
due to an AWS Kinesis source error.
> Here are the logs from one of the Task Manager instances on startup:
> {code}
> 2016-08-30 14:52:50,438 WARN  org.apache.hadoop.util.NativeCodeLoader               
       - Unable to load native-hadoop library for your platform... using builtin-java classes
where applicable
> 2016-08-30 14:52:50,540 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - --------------------------------------------------------------------------------
> 2016-08-30 14:52:50,540 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -  Starting TaskManager (Version: 1.1.1, Rev:61bfb36, Date:09.08.2016 @ 12:09:08 UTC)
> 2016-08-30 14:52:50,540 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -  Current user: root
> 2016-08-30 14:52:50,541 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.92-b14
> 2016-08-30 14:52:50,541 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -  Maximum heap size: 2048 MiBytes
> 2016-08-30 14:52:50,541 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -  JAVA_HOME: /usr/lib/jvm/java-1.8-openjdk/jre
> 2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -  Hadoop version: 2.7.2
> 2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -  JVM Options:
> 2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -     -XX:+UseG1GC
> 2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -     -Xms2048M
> 2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -     -Xmx2048M
> 2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -     -XX:MaxDirectMemorySize=8388607T
> 2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -     -Dlog.file=/usr/local/flink-1.1.1/log/flink--taskmanager-1-ip-10-55-2-218.log
> 2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -     -Dlog4j.configuration=file:/usr/local/flink-1.1.1/conf/log4j.properties
> 2016-08-30 14:52:50,543 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -     -Dlogback.configurationFile=file:/usr/local/flink-1.1.1/conf/logback.xml
> 2016-08-30 14:52:50,544 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -  Program Arguments:
> 2016-08-30 14:52:50,544 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -     --configDir
> 2016-08-30 14:52:50,544 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -     /usr/local/flink-1.1.1/conf
> 2016-08-30 14:52:50,544 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       -  Classpath: /usr/local/flink-1.1.1/lib/flink-dist_2.11-1.1.1.jar:/usr/local/flink-1.1.1/lib/flink-metrics-statsd-1.1.1.jar:/usr/local/flink-1.1.1/lib/flink-python_2.11-1.1.1.jar:/usr/local/flink-1.1.1/lib/log4j-1.2.17.jar:/usr/local/flink-1.1.1/lib/slf4j-log4j12-1.7.7.jar:::
> 2016-08-30 14:52:50,544 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - --------------------------------------------------------------------------------
> 2016-08-30 14:52:50,544 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Registered UNIX signal handlers for [TERM, HUP, INT]
> 2016-08-30 14:52:50,547 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Maximum number of open file descriptors is 1048576
> 2016-08-30 14:52:50,565 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Loading configuration from /usr/local/flink-1.1.1/conf
> 2016-08-30 14:52:50,610 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Security is not enabled. Starting non-authenticated TaskManager.
> 2016-08-30 14:52:50,610 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Using configured hostname/address for TaskManager: 10.55.2.218
> 2016-08-30 14:52:50,611 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Starting TaskManager
> 2016-08-30 14:52:50,615 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Starting TaskManager actor system at 10.55.2.218:0
> 2016-08-30 14:52:50,956 INFO  akka.event.slf4j.Slf4jLogger                          
       - Slf4jLogger started
> 2016-08-30 14:52:51,005 INFO  Remoting                                              
       - Starting remoting
> 2016-08-30 14:52:51,159 INFO  Remoting                                              
       - Remoting started; listening on addresses :[akka.tcp://flink@10.55.2.218:44007]
> 2016-08-30 14:52:51,163 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Starting TaskManager actor
> 2016-08-30 14:52:51,177 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig 
       - NettyConfig [server address: /10.55.2.218, server port: 36007, memory segment size
(bytes): 32768, transport type: NIO, number of server threads: 2 (manual), number of client
threads: 2 (manual), server connect backlog: 0 (use Netty's default), client connect timeout
(sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]
> 2016-08-30 14:52:51,179 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Messages between TaskManager and JobManager have a max timeout of 10000 milliseconds
> 2016-08-30 14:52:51,183 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Temporary file directory '/tmp': total 9 GB, usable 8 GB (88.89% usable)
> 2016-08-30 14:52:51,227 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool
 - Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment:
32768).
> 2016-08-30 14:52:51,287 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Limiting managed memory to 0.7 of the currently free heap space (1377 MB), memory
will be allocated lazily.
> 2016-08-30 14:52:51,309 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  
       - I/O manager uses directory /tmp/flink-io-c000b71a-d793-4f4c-90c4-0c0808154219 for
spill files.
> 2016-08-30 14:52:51,325 INFO  org.apache.flink.runtime.filecache.FileCache          
       - User file cache uses directory /tmp/flink-dist-cache-95fd32d3-a629-42ce-816e-ca8a63e13c7d
> 2016-08-30 14:52:51,529 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Starting TaskManager actor at akka://flink/user/taskmanager#496823118.
> 2016-08-30 14:52:51,529 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - TaskManager data connection information: ip-10-55-2-218.ec2.internal (dataPort=36007)
> 2016-08-30 14:52:51,530 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - TaskManager has 2 task slot(s).
> 2016-08-30 14:52:51,531 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Memory usage stats: [HEAP: 93/2048/2048 MB, NON HEAP: 31/32/-1 MB (used/committed/max)]
> 2016-08-30 14:52:51,534 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Trying to register at JobManager akka.tcp://flink@10.55.2.212:6123/user/jobmanager
(attempt 1, timeout: 500 milliseconds)
> 2016-08-30 14:52:51,690 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Successful registration at JobManager (akka.tcp://flink@10.55.2.212:6123/user/jobmanager),
starting network stack and library cache.
> 2016-08-30 14:52:51,841 INFO  org.apache.flink.runtime.io.network.netty.NettyClient 
       - Successful initialization (took 28 ms).
> 2016-08-30 14:52:51,871 INFO  org.apache.flink.runtime.io.network.netty.NettyServer 
       - Successful initialization (took 30 ms). Listening on SocketAddress /10.55.2.218:36007.
> 2016-08-30 14:52:51,872 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Determined BLOB server address to be /10.55.2.212:55892. Starting BLOB cache.
> 2016-08-30 14:52:51,874 INFO  org.apache.flink.runtime.blob.BlobCache               
       - Created BLOB cache storage directory /tmp/blobStore-81d7d33f-a815-4f0b-9131-d3ba5a256d1b
> 2016-08-30 14:52:51,889 INFO  org.apache.flink.metrics.statsd.StatsDReporter        
       - Starting StatsDReporter to send metric reports to localhost/127.0.0.1:8125
> 2016-08-30 14:52:51,891 INFO  org.apache.flink.runtime.metrics.MetricRegistry       
       - Periodically reporting metrics in intervals of 10 SECONDS for reporter statsd of
type org.apache.flink.metrics.statsd.StatsDReporter.
> {code}
> The Kinesis reader errors look like:
> {code}
> 2016-08-30 17:23:43,353 WARN  org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy
 - Got ProvisionedThroughputExceededException. Backing off for 53 millis.
> 2016-08-30 17:23:43,725 WARN  org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy
 - Got ProvisionedThroughputExceededException. Backing off for 597 millis.
> 2016-08-30 17:23:44,805 WARN  org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy
 - Got ProvisionedThroughputExceededException. Backing off for 538 millis.
> 2016-08-30 17:23:45,344 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
 - Shutting down the shard consumer threads of subtask 0 ...
> 2016-08-30 17:23:45,394 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask   
       - Timer service is shutting down.
> 2016-08-30 17:23:45,395 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
 - Shutting down the shard consumer threads of subtask 0 ...
> 2016-08-30 17:23:45,395 ERROR org.apache.flink.runtime.taskmanager.Task             
       - Task execution failed. 
> java.lang.RuntimeException: Rate Exceeded for getRecords operation - all 3retryattempts
returned ProvisionedThroughputExceededException.
> 	at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:204)
> 	at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:167)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 2016-08-30 17:23:45,396 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Source: Kinesis (1/2) switched to FAILED with exception.
> java.lang.RuntimeException: Rate Exceeded for getRecords operation - all 3retryattempts
returned ProvisionedThroughputExceededException.
> 	at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:204)
> 	at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:167)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 2016-08-30 17:23:45,396 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Freeing task resources for Source: Kinesis (1/2)
> 2016-08-30 17:23:45,396 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask   
       - Timer service is shutting down.
> 2016-08-30 17:23:45,396 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Un-registering task and sending final execution state FAILED to JobManager for task
Source: Kinesis (c1bf869b9d8506cea67b1317b21c014e)
> 2016-08-30 17:23:45,396 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask   
       - Timer service is shutting down.
> 2016-08-30 17:23:45,402 INFO  org.apache.flink.runtime.taskmanager.TaskManager      
       - Discarding the results produced by task execution c1bf869b9d8506cea67b1317b21c014e
> 2016-08-30 17:23:45,403 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Attempting to cancel task Parse -> Timestamp and Watermark -> (Map, Map, Map,
Map) (1/4)
> 2016-08-30 17:23:45,403 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Parse -> Timestamp and Watermark -> (Map, Map, Map, Map) (1/4) switched to
CANCELING
> 2016-08-30 17:23:45,403 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Triggering cancellation of task code Parse -> Timestamp and Watermark -> (Map,
Map, Map, Map) (1/4) (70809fea05a6f67ffd7672bbe5b9643a).
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Attempting to cancel task Parse -> Timestamp and Watermark -> (Map, Map, Map,
Map) (3/4)
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Parse -> Timestamp and Watermark -> (Map, Map, Map, Map) (3/4) switched to
CANCELING
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Triggering cancellation of task code Parse -> Timestamp and Watermark -> (Map,
Map, Map, Map) (3/4) (a70eafedd01af499a4ee066108e92ae8).
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Attempting to cancel task Fold: property_id, video_id -> 1-minute Fixed-Window
Percentile Aggregation -> Sink: InfluxDB (1/4)
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Fold: property_id, video_id -> 1-minute Fixed-Window Percentile Aggregation ->
Sink: InfluxDB (1/4) switched to CANCELING
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Triggering cancellation of task code Fold: property_id, video_id -> 1-minute Fixed-Window
Percentile Aggregation -> Sink: InfluxDB (1/4) (b71730eb5cd484cde0eb8332e69d443e).
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Attempting to cancel task Fold: property_id, video_id -> 1-minute Fixed-Window
Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Fold: property_id, video_id -> 1-minute Fixed-Window Percentile Aggregation ->
Sink: InfluxDB (2/4) switched to CANCELING
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Triggering cancellation of task code Fold: property_id, video_id -> 1-minute Fixed-Window
Percentile Aggregation -> Sink: InfluxDB (2/4) (6115f675c4d36004d4c885c9868d6b61).
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Attempting to cancel task Fold: property_id, video_id -> 5-minute Sliding-Window
Percentile Aggregation -> Sink: InfluxDB (1/4)
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Fold: property_id, video_id -> 5-minute Sliding-Window Percentile Aggregation
-> Sink: InfluxDB (1/4) switched to CANCELING
> 2016-08-30 17:23:45,405 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Triggering cancellation of task code Fold: property_id, video_id -> 5-minute Sliding-Window
Percentile Aggregation -> Sink: InfluxDB (1/4) (d9cb8238533083dc2396e90cae3e702e).
> 2016-08-30 17:23:45,406 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Attempting to cancel task Fold: property_id, video_id -> 5-minute Sliding-Window
Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-30 17:23:45,406 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Fold: property_id, video_id -> 5-minute Sliding-Window Percentile Aggregation
-> Sink: InfluxDB (2/4) switched to CANCELING
> 2016-08-30 17:23:45,406 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Triggering cancellation of task code Fold: property_id, video_id -> 5-minute Sliding-Window
Percentile Aggregation -> Sink: InfluxDB (2/4) (311ffa604444b33985572fe99170f405).
> 2016-08-30 17:23:45,406 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask   
       - Timer service is shutting down.
> 2016-08-30 17:23:45,406 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Attempting to cancel task Fold: property_id, video_id -> 10-minute Sliding-Window
Percentile Aggregation -> Sink: InfluxDB (1/4)
> 2016-08-30 17:23:45,420 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Fold: property_id, video_id -> 10-minute Sliding-Window Percentile Aggregation
-> Sink: InfluxDB (1/4) switched to CANCELING
> 2016-08-30 17:23:45,420 INFO  org.apache.flink.runtime.taskmanager.Task             
       - Triggering cancellation of task code Fold: property_id, video_id -> 10-minute
Sliding-Window Percentile Aggregation -> Sink: InfluxDB (1/4) (556a4e6d2a4b7686539b50ad6b5f0d0d).
> 2016-08-30 17:23:45,420 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask   
       - Timer service is shutting down.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message