flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: FlinkKafkaConsumer010 - Memory Issue
Date Wed, 19 Jul 2017 19:14:24 GMT
Hi,

Gordon (in CC) knows the details of Flink's Kafka consumer.
He might know how to solve this issue.

Best, Fabian

2017-07-19 20:23 GMT+02:00 PedroMrChaves <pedro.mr.chaves@gmail.com>:

> Hello,
>
> Whenever I submit a job to Flink that retrieves data from Kafka the memory
> consumption continuously increases. I've changed the max heap memory from
> 2gb, to 4gb, even to 6gb but the memory consumption keeps reaching the
> limit.
>
> An example of a simple Job that shows this behavior is depicted bellow.
>
> /          /*
>              * Execution Environment Setup
>              */
>             final StreamExecutionEnvironment environment =
> getGlobalJobConfiguration(configDir, configurations);
>
>             /**
>              * Collect event data from Kafka
>              */
>             DataStreamSource<String> s = environment.addSource(new
> FlinkKafkaConsumer010<String>(
>                     configurations.get(ConfigKeys.KAFKA_INPUT_TOPIC),
>                     new SimpleStringSchema(),
>                     getKafkaConfiguration(configurations)));
>
>             s.filter(new FilterFunction<String>() {
>                 @Override
>                 public boolean filter(String value) throws Exception {
>                     return false;
>                 }
>             }).print();
>
> private static Properties getKafkaConfiguration(ParameterTool
> configurations) {
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers",
> configurations.get(ConfigKeys.KAFKA_HOSTS));
>         properties.put("group.id",
> "flink-consumer-"+UUID.randomUUID().toString());
>         properties.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>         properties.put("security.protocol",
> configurations.get(ConfigKeys.KAFKA_SECURITY_PROTOCOL));
>         properties.put("ssl.truststore.location",
> configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_LOCATION));
>         properties.put("ssl.truststore.password",
> configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_PASSWORD));
>         properties.put("ssl.keystore.location",
> configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_LOCATION));
>         properties.put("ssl.keystore.password",
> configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_PASSWORD));
>         return properties;
>     }
> /
>
>
> Moreover, when I stop the job, the task manager does not terminate the
> kafka
> connection and the memory is kept allocated. To stop this, I have to kill
> the task manager process.
>
> *My Flink version: 1.2.1
> Kafka consumer: 010
> Kafka version: 2_11_0.10.1.0-2*
>
> I've activated the /taskmanager.debug.memory.startLogThread/ property to
> output for every 5 seconds and attached the log with the results.
>
> The output of free -m before submitting the job:
> /              total        used        free      shared  buff/cache
> available
> Mem:          15817         245       14755          24         816
> 15121
> Swap:             0           0           0/
>
> after having the job running for about 5 min:
>  free -m
> /              total        used        free      shared  buff/cache
> available
> Mem:          15817        9819        5150          24         847
> 5547
> Swap:             0           0           0
> /
>
> taskmanager.log
> <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/n14342/taskmanager.log>
>
>
>
>
>
> -----
> Best Regards,
> Pedro Chaves
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer010-Memory-
> Issue-tp14342.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Mime
View raw message