flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Running single Flink job in a job cluster, problem starting JobManager
Date Sun, 10 Feb 2019 09:30:54 GMT
I'm afraid we haven't had much experience with Spring Boot Flink 
applications.

It is indeed strange that the job ends up using a StreamPlanEnvironment.
As a debugging step I would look into all calls to 
ExecutionEnviroment#initializeContextEnvironment().
This is how specific execution environments are injected into 
(Stream)ExecutionEnvironment#getEnvironment().

On 08.02.2019 15:17, Thomas Eckestad wrote:
> Hi again,
>
> when removing Spring Boot from the application it works.
>
> I would really like to mix Spring Boot and Flink. It does work with 
> Spring Boot when submitting jobs to a session cluster, as stated before.
>
> /Thomas
> ------------------------------------------------------------------------
> *From:* Thomas Eckestad <Thomas.Eckestad@verisure.com>
> *Sent:* Friday, February 8, 2019 12:14 PM
> *To:* user@flink.apache.org
> *Subject:* Running single Flink job in a job cluster, problem starting 
> JobManager
> Hi,
>
> I am trying to run a flink job cluster in K8s. As a first step I have 
> created a Docker image according to:
>
> https://github.com/apache/flink/blob/release-1.7/flink-container/docker/README.md
>
> When I try to run the image:
>
> docker run --name=flink-job-manager flink-image:latest job-cluster 
> --job-classname com.foo.bar.FlinkTest 
> -Djobmanager.rpc.address=flink-job-cluster -Dparallelism.default=1 
> -Dblob.server.port=6124 -Dqueryable-state.server.ports=6125
>
> the execution fails with the following exception:
>
> org.springframework.beans.factory.BeanCreationException: Error 
> creating bean with name 'MyFlinkJob': Invocation of init method 
> failed; nested exception is 
> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
> at 
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:139)
> at 
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:419)
> at 
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1737)
> at 
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:576)
> at 
> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:498)
> at 
> org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
> at 
> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
> at 
> org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
> at 
> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
> at 
> org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:846)
> at 
> org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863)
> at 
> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546)
> at 
> org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
> at 
> org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
> at 
> org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
> at 
> org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
> at 
> org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
> at com.foo.bar.FlinkTest.main(FlinkTest.java:10)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
> Caused by: 
> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
> at 
> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:70)
> at com.foo.bar.FlinkJob.MyFlinkJob.init(MyFlinkJob.java:59)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:363)
> at 
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:307)
> at 
> org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:136)
> ... 22 more
>
> I can successfully run the same job.jar on a session cluster 
> (start-cluster.sh;flink run job.jar). Any ideas? Feels like I am 
> missing something obvious?
>
> At MyFlinkJob.java:59 I do: 
> streamExecutionEnvironment.execute("MyFlinkJob");
>
> It feels strange that the execution ends up in 
> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute?
>
> From 
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java:
>
> /**
>  * A special {@link StreamExecutionEnvironment} that is used in the 
> web frontend when generating
>  * a user-inspectable graph of a streaming job.
>  */
> @PublicEvolving
> public class StreamPlanEnvironment extends StreamExecutionEnvironment {
>
>
> I am using 
> https://archive.apache.org/dist/flink/flink-1.7.1/flink-1.7.1-bin-scala_2.11.tgz 
> (I have also tried 1.6.3 and 1.7.0, no difference in behavior).
>
> * docker --version -> Docker version 1.13.1
> * uname -a -> Linux SEOSTL0069.SEC.INTRA 4.20.4-200.fc29.x86_64 #1 SMP 
> Wed Jan 23 16:11:28 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux
>
> Thank you,
> Thomas Eckestad



Mime
View raw message