flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Eckestad <Thomas.Eckes...@verisure.com>
Subject Re: Running single Flink job in a job cluster, problem starting JobManager
Date Tue, 12 Feb 2019 12:48:15 GMT
I have investigated this further:

During normal operation, without Spring Boot, OptimizerPlanEnvironment.ProgramAbortException
is thrown by Flink from StreamPlanEnvironment::execute():70. This is caught by PackagedProgram::callMainMethod():537,
where it is re-thrown as an Error exception. This Error is caught in OptimizerPlanEnvironment::getOptimizedPlan():88,
which checks if the optimizerPlan field != null, if so it returns the value of that field
else it re-throws, i.e. since the optimizerPlan IS != null the exception stops there and the
job is executed as expected. I.e. the Flink control flow is relying on throwing (and handling)
ProgramAbortException.

When using Spring Boot the execution fails due to the OptimizerPlanEnvironment.ProgramAbortException
mentioned above. In that case Spring logic gets between PackagedProgram::callMainMethod()
and the invocation of the method where the Flink ExecutionEnvironment is built and executed.
Spring will catch any Throwable and interpret it as a failure and exit.

I guess when deploying the Spring Boot Flink job to a session-cluster, which I mentioned above
works fine, the Flink job does not rely on passing exceptions between Flink bootstrap-code
and the Flink job?

/Thomas

________________________________
From: Chesnay Schepler <chesnay@apache.org>
Sent: Sunday, February 10, 2019 10:30:54 AM
To: Thomas Eckestad; user@flink.apache.org
Subject: Re: Running single Flink job in a job cluster, problem starting JobManager

I'm afraid we haven't had much experience with Spring Boot Flink applications.

It is indeed strange that the job ends up using a StreamPlanEnvironment.
As a debugging step I would look into all calls to ExecutionEnviroment#initializeContextEnvironment().
This is how specific execution environments are injected into (Stream)ExecutionEnvironment#getEnvironment().

On 08.02.2019 15:17, Thomas Eckestad wrote:
Hi again,

when removing Spring Boot from the application it works.

I would really like to mix Spring Boot and Flink. It does work with Spring Boot when submitting
jobs to a session cluster, as stated before.

/Thomas
________________________________
From: Thomas Eckestad <Thomas.Eckestad@verisure.com><mailto:Thomas.Eckestad@verisure.com>
Sent: Friday, February 8, 2019 12:14 PM
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Running single Flink job in a job cluster, problem starting JobManager

Hi,

I am trying to run a flink job cluster in K8s. As a first step I have created a Docker image
according to:

https://github.com/apache/flink/blob/release-1.7/flink-container/docker/README.md<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_release-2D1.7_flink-2Dcontainer_docker_README.md&d=DwMD-g&c=NXCSiDokCAYy9C9zX2fa5Ly_TC9IMWG6noydjiusVWk&r=P6A97OLLSO6VXw8VE3JCWMO20OzvxG-NoWHVPkk-a9Q&m=Rqsyp0_EEk3-KDRnzTBMT5-bx9GFlZaxGIr_jGWg8VM&s=CXkwM8WcThTDrIFvV0U_OQL5QmZ-Qn2g1lQSSNaAd1k&e=>

When I try to run the image:

docker run --name=flink-job-manager flink-image:latest job-cluster --job-classname com.foo.bar.FlinkTest
-Djobmanager.rpc.address=flink-job-cluster -Dparallelism.default=1 -Dblob.server.port=6124
-Dqueryable-state.server.ports=6125

the execution fails with the following exception:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'MyFlinkJob':
Invocation of init method failed; nested exception is org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:139)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:419)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1737)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:576)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:498)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:846)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
at com.foo.bar.FlinkTest.main(FlinkTest.java:10)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:70)
at com.foo.bar.FlinkJob.MyFlinkJob.init(MyFlinkJob.java:59)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:363)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:307)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:136)
... 22 more

I can successfully run the same job.jar on a session cluster (start-cluster.sh;flink run job.jar).
Any ideas? Feels like I am missing something obvious?

At MyFlinkJob.java:59 I do: streamExecutionEnvironment.execute("MyFlinkJob");

It feels strange that the execution ends up in org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute?

>From https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_master_flink-2Dstreaming-2Djava_src_main_java_org_apache_flink_streaming_api_environment_StreamPlanEnvironment.java&d=DwMD-g&c=NXCSiDokCAYy9C9zX2fa5Ly_TC9IMWG6noydjiusVWk&r=P6A97OLLSO6VXw8VE3JCWMO20OzvxG-NoWHVPkk-a9Q&m=Rqsyp0_EEk3-KDRnzTBMT5-bx9GFlZaxGIr_jGWg8VM&s=_uD6DkRbJadF_sbTvSKpqOibiJEs6UO_r1II3jIWqRI&e=>:

/**
 * A special {@link StreamExecutionEnvironment} that is used in the web frontend when generating
 * a user-inspectable graph of a streaming job.
 */
@PublicEvolving
public class StreamPlanEnvironment extends StreamExecutionEnvironment {


I am using https://archive.apache.org/dist/flink/flink-1.7.1/flink-1.7.1-bin-scala_2.11.tgz<https://urldefense.proofpoint.com/v2/url?u=https-3A__archive.apache.org_dist_flink_flink-2D1.7.1_flink-2D1.7.1-2Dbin-2Dscala-5F2.11.tgz&d=DwMD-g&c=NXCSiDokCAYy9C9zX2fa5Ly_TC9IMWG6noydjiusVWk&r=P6A97OLLSO6VXw8VE3JCWMO20OzvxG-NoWHVPkk-a9Q&m=Rqsyp0_EEk3-KDRnzTBMT5-bx9GFlZaxGIr_jGWg8VM&s=N1mTTdS0UDjIdWIXlTf_V-Cx-n2mFBmMjnSZX4B4YOY&e=>
(I have also tried 1.6.3 and 1.7.0, no difference in behavior).

* docker --version -> Docker version 1.13.1
* uname -a -> Linux SEOSTL0069.SEC.INTRA 4.20.4-200.fc29.x86_64 #1 SMP Wed Jan 23 16:11:28
UTC 2019 x86_64 x86_64 x86_64 GNU/Linux

Thank you,
Thomas Eckestad


Mime
View raw message