flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kien Truong <duckientru...@gmail.com>
Subject Re: FlinkKafkaConsumer010 - Memory Issue
Date Sat, 22 Jul 2017 01:55:12 GMT
Hi,

 From the log, it doesn't seem that the task manager use a lot of memory.

Can you post the output of top.

Regards,

Kien


On 7/20/2017 1:23 AM, PedroMrChaves wrote:
> Hello,
>
> Whenever I submit a job to Flink that retrieves data from Kafka the memory
> consumption continuously increases. I've changed the max heap memory from
> 2gb, to 4gb, even to 6gb but the memory consumption keeps reaching the
> limit.
>
> An example of a simple Job that shows this behavior is depicted bellow.
>
> /          /*
>               * Execution Environment Setup
>               */
>              final StreamExecutionEnvironment environment =
> getGlobalJobConfiguration(configDir, configurations);
>
>              /**
>               * Collect event data from Kafka
>               */
>              DataStreamSource<String> s = environment.addSource(new
> FlinkKafkaConsumer010<String>(
>                      configurations.get(ConfigKeys.KAFKA_INPUT_TOPIC),
>                      new SimpleStringSchema(),
>                      getKafkaConfiguration(configurations)));
>              
>              s.filter(new FilterFunction<String>() {
>                  @Override
>                  public boolean filter(String value) throws Exception {
>                      return false;
>                  }
>              }).print();
>
> private static Properties getKafkaConfiguration(ParameterTool
> configurations) {
>          Properties properties = new Properties();
>          properties.setProperty("bootstrap.servers",
> configurations.get(ConfigKeys.KAFKA_HOSTS));
>          properties.put("group.id",
> "flink-consumer-"+UUID.randomUUID().toString());
>          properties.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>          properties.put("security.protocol",
> configurations.get(ConfigKeys.KAFKA_SECURITY_PROTOCOL));
>          properties.put("ssl.truststore.location",
> configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_LOCATION));
>          properties.put("ssl.truststore.password",
> configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_PASSWORD));
>          properties.put("ssl.keystore.location",
> configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_LOCATION));
>          properties.put("ssl.keystore.password",
> configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_PASSWORD));
>          return properties;
>      }
> /
>
>
> Moreover, when I stop the job, the task manager does not terminate the kafka
> connection and the memory is kept allocated. To stop this, I have to kill
> the task manager process.
>
> *My Flink version: 1.2.1
> Kafka consumer: 010
> Kafka version: 2_11_0.10.1.0-2*
>
> I've activated the /taskmanager.debug.memory.startLogThread/ property to
> output for every 5 seconds and attached the log with the results.
>
> The output of free -m before submitting the job:
> /              total        used        free      shared  buff/cache
> available
> Mem:          15817         245       14755          24         816
> 15121
> Swap:             0           0           0/
>
> after having the job running for about 5 min:
>   free -m
> /              total        used        free      shared  buff/cache
> available
> Mem:          15817        9819        5150          24         847
> 5547
> Swap:             0           0           0
> /
>
> taskmanager.log
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14342/taskmanager.log>
>
>
>
>
>
> -----
> Best Regards,
> Pedro Chaves
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer010-Memory-Issue-tp14342.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Mime
View raw message