flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 邓成刚【qq】 <bnnbb...@qq.com>
Subject blink消费kafka出现诡异的情况,困扰很久了,哪位大佬知道怎么回事
Date Tue, 26 Mar 2019 10:17:08 GMT
HI,各位大佬:
      发现一个很诡异的问题:使用SQL API时,在窗口上group by,JOB 5分钟后会timeout,但如果改成select
* 就能正常消费kafka。。。
说明:本地模式和提交JOB均存在此异常
相关信息:
blink 1.5.1
kafka 1.1.1
flink-connector-kafka-0.11_2.11-1.5.1-sql-jar.jar

消费正常的code:

String sql = "select * from table1"

Table sip_distinct_event_id = tableEnv.sqlQuery( sql );           

		tableEnv.toRetractStream(sip_distinct_event_id, Row.class).print();
		env.execute("myjob2");



如果把SQL换成如下就会timeout...

String sql ="select TUMBLE_START(EVENTTIME,INTERVAL '1' MINUTE) AS EVENTTIME,NEW_EVENT_ID,MSISDN
from   
		        +"select EVENTTIME,EVENT_ID as NEW_EVENT_ID,MSISDN from table1"		        
		       +") group by TUMBLE(EVENTTIME,INTERVAL '1' MINUTE),NEW_EVENT_ID,MSISDN"); 



Table sip_distinct_event_id = tableEnv.sqlQuery( sql );           

		tableEnv.toRetractStream(sip_distinct_event_id, Row.class).print();
		env.execute("myjob2");


异常:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.util.concurrent.TimeoutException
	at org.apache.flink.runtime.minicluster.MiniCluster.executeJob(MiniCluster.java:637)
	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.executeInternal(LocalStreamEnvironment.java:98)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1893)
	at com.nsn.flink.service.DealRegisterFile13.main(DealRegisterFile13.java:98)
Caused by: java.util.concurrent.TimeoutException
	at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)




Mime
View raw message