flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino yang <yanghua1...@gmail.com>
Subject Re: flink OutOfMemoryError: GC overhead limit execeeded
Date Wed, 20 Nov 2019 02:08:23 GMT
Hi 龙安,

Sorry, I did not know you used Blink planner.

> Before use NumericBetweenParametersProvider, the job read data from
database just use one Task manager even i have more then one TM .

About using only one TM, it seems it's a known issue[1].

Best,
Vino

[1]: https://issues.apache.org/jira/browse/FLINK-12122

淘宝龙安 <rentb419@gmail.com> 于2019年11月19日周二 下午9:26写道:

> 1 )  In flink 1.9.0, I found that  BatchTableSource is Deprecated. Then i
> found  InputFormatTableSource who's function isBounded decide the job is
> batch.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#main-differences-between-the-two-planners
>
> In this document above, it says Blink treats batch jobs as a special case
> of streaming.
>
> So I guess  stream env is same as BatchTableEnvironment. And it works
> fine. I don't know if it is correct or not.
>
> 2 )  yes, I made a mistake. When I corrected it, it still doesn't
> work like the past. my new startup commond is   /app/flink-1.9.1/bin/flink
> run -m yarn-cluster -p 8 -ys 4  -yjm 5120 -ytm 16384  my-flink-job.jar
>
> And finally I solved this problem by  NumericBetweenParametersProvider
>
>             jdbcBuilder.setParametersProvider(new
> NumericBetweenParametersProvider(JobConfig.jdbcFetchSize,
>                     configDO.getBatchStart(),
>                     configDO.getBatchEnd()));
>
> But I don't know why ?
>
> Before use NumericBetweenParametersProvider, the job read data from
> database just use one Task manager even i have more then one TM .
>
>
> vino yang <yanghua1127@gmail.com> 于2019年11月19日周二 下午4:16写道:
>
>> Hi 龙安,
>>
>> Firstly, I have two questions.
>>
>> 1) You said this is a batch job, while you used stream env and stream
>> table env.
>>
>> 2) From the startup command, I saw the "-yn" config option which is not
>> supported since Flink 1.8+. I guess you only started one TM
>> container(-p/-s=1). If I am wrong, please correct me.
>>
>> Can we firstly align these two questions?
>>
>> Best,
>> Vino
>>
>>
>> 淘宝龙安 <rentb419@gmail.com> 于2019年11月19日周二 下午1:32写道:
>>
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/data4/HDATA/yarn/local/usercache/flink/appcache/application_1573355697642_32093/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/data1/app/hadoop-2.7.3-snappy-32core12disk/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> Exception in thread "I/O dispatcher 3" java.lang.OutOfMemoryError: GC
>>> overhead limit exceeded
>>> at java.util.HashMap$KeySet.iterator(HashMap.java:917)
>>> at java.util.HashSet.iterator(HashSet.java:173)
>>> at
>>> java.util.Collections$UnmodifiableCollection$1.<init>(Collections.java:1039)
>>> at
>>> java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
>>> at
>>> org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
>>> at
>>> org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
>>> at
>>> org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
>>> at
>>> org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Exception in thread "I/O dispatcher 28" Exception in thread "I/O
>>> dispatcher 32" java.lang.OutOfMemoryError: GC overhead limit exceeded
>>> Exception in thread "I/O dispatcher 24" java.lang.OutOfMemoryError: GC
>>> overhead limit exceeded
>>> Exception in thread "I/O dispatcher 26" java.lang.OutOfMemoryError: GC
>>> overhead limit exceeded
>>> Exception in thread "I/O dispatcher 12" Exception in thread "I/O
>>> dispatcher 17" java.lang.OutOfMemoryError: GC overhead limit exceeded
>>> at
>>> java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
>>> at
>>> org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
>>> at
>>> org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
>>> at
>>> org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
>>> at
>>> org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Exception in thread "I/O dispatcher 18" java.lang.OutOfMemoryError: GC
>>> overhead limit exceeded
>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>> at
>>> java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
>>> at
>>> org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
>>> at
>>> org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
>>> at
>>> org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
>>> at
>>> org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
>>> at java.lang.Thread.run(Thread.java:748)
>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>
>>> 淘宝龙安 <rentb419@gmail.com> 于2019年11月19日周二 上午11:04写道:
>>>
>>>> hi, yanghua
>>>>
>>>> Thanks for your response , my scenario is very simple.
>>>>
>>>> I have two table in database.
>>>>
>>>> table A (user_info)
>>>> --------------------------------
>>>> id       |   varchar    |
>>>> name |  varchar     |
>>>> age    |  numeric    |
>>>> --------------------------------
>>>>
>>>>
>>>>
>>>> Table B (order_info)
>>>> -------------------------------
>>>> id           |   varchar    |
>>>> user_id  |   varchar    |
>>>> price      |   numeric   |
>>>> ------------------------------
>>>>
>>>>
>>>>
>>>>
>>>> I read these two table into flink by JDBC.   Then i join them.  my code
>>>> is just like this .
>>>>
>>>>           EnvironmentSettings fsSettings =
>>>> EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
>>>>          StreamExecutionEnvironment fsEnv =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>           StreamTableEnvironment fsTableEnv =
>>>> StreamTableEnvironment.create(fsEnv, fsSettings);
>>>>
>>>>         // register table user_info with  JDBCInputFormat
>>>>         fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
>>>>                 fieldNames,
>>>>                 types,
>>>>                 JDBCInputFormat.buildJDBCInputFormat()
>>>>                         .setDBUrl(configDO.getJDBCUrl())
>>>>                         .setUsername(configDO.getDbUser())
>>>>                         .setPassword(configDO.getDbPassword())
>>>>                         .setFetchSize(10000)
>>>>                         .setDrivername(configDO.getJdbcDriver())
>>>>                         .setRowTypeInfo(new RowTypeInfo(types,
>>>> fieldNames))
>>>>                         .setQuery("select id, name, age from user_info")
>>>>                         .finish()
>>>>         ));
>>>>
>>>>    // register table order_info with JDBCInputFormat
>>>>      fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
>>>>                 fieldNames,
>>>>                 types,
>>>>                 JDBCInputFormat.buildJDBCInputFormat()
>>>>                         .setDBUrl(configDO.getJDBCUrl())
>>>>                         .setUsername(configDO.getDbUser())
>>>>                         .setPassword(configDO.getDbPassword())
>>>>                         .setFetchSize(10000)
>>>>                         .setDrivername(configDO.getJdbcDriver())
>>>>                         .setRowTypeInfo(new RowTypeInfo(types,
>>>> fieldNames))
>>>>                         .setQuery("select id, name, age from
>>>> order_info")
>>>>                         .finish()
>>>>         ));
>>>>
>>>>
>>>>  //register a elasticsearch sink
>>>>   fsTableEnv.registerTableSink("output_table",
>>>>                 new
>>>> ElasticSearchTableSink.ElasticSearchTableSinkBuilder()
>>>>                         .hosts(configDO.getElasticSearchServer())
>>>>
>>>> .index(ElasticsearchConfigUtils.getIndex(configDO, dateTime))
>>>>                         .docType("_doc")
>>>>
>>>> .fieldNames(DatabaseTypesUtils.getFieldNames(configDO.getOutput()))
>>>>
>>>> .fieldTypes(DatabaseTypesUtils.getTableTypes(configDO.getOutput()))
>>>>                         .build());
>>>>
>>>>  // then join these two table
>>>>         fsTableEnv.sqlUpdate("insert into output_table select
>>>> user_info.id as user_id, order_info.id as order_id, user_info.name,
>>>> order_info.price from user_info join order_info on order_info.user_id =
>>>> user_info.id ");
>>>>         fsEnv.execute(taskName);
>>>>
>>>>
>>>>
>>>> Then i run it on yarn cluster.
>>>>
>>>>  /app/flink-1.9.1/bin/flink run -m yarn-cluster -p 4 -ys 4-yn 4 -yjm
>>>> 5120 -ytm 16384  my-flink-job.jar
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> vino yang <yanghua1127@gmail.com> 于2019年11月18日周一 上午11:47写道:
>>>>
>>>>> Hi longan,
>>>>>
>>>>> Preliminary evaluation, only 10w+ records may not cause OOM. Can you
>>>>> give more details about your job e.g. job graph or business logic(how
many
>>>>> and what kinds of operators do you use?), how many TM containers? log
files
>>>>> and so on.
>>>>>
>>>>> What's more, besides monitoring memory metrics, you can view memory GC
>>>>> information with two config options provided by Flink itself.[1]
>>>>>
>>>>> Best,
>>>>> Vino
>>>>>
>>>>> [1]:
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html#memory-and-performance-debugging
>>>>>
>>>>> 淘宝龙安 <rentb419@gmail.com> 于2019年11月17日周日 下午1:03写道:
>>>>>
>>>>>> hi, all
>>>>>>
>>>>>>   I have a batch job , read data from postgreSQL with jdbc . When
the
>>>>>> record count greater than 100,000, then the flink job
>>>>>> occur OutOfMemoryError: GC overhead limit exceeded
>>>>>>
>>>>>> The TaskManager memory is 16GB
>>>>>>
>>>>>> -yjm 5120 -ytm 16384
>>>>>>
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>>
>>>>>> my config is .  anybody can help me ?
>>>>>>
>>>>>> JDBCInputFormat.buildJDBCInputFormat()
>>>>>>         .setDBUrl(configDO.getJDBCUrl())
>>>>>>         .setUsername(configDO.getDbUser())
>>>>>>         .setPassword(configDO.getDbPassword())
>>>>>>         .setFetchSize(JobConfig.jdbcFetchSize)
>>>>>>         .setDrivername(configDO.getJdbcDriver())
>>>>>>         .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
>>>>>>         .setQuery(sql)
>>>>>>         .finish()
>>>>>>
>>>>>>

Mime
View raw message