flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Wouter Zorgdrager <W.D.Zorgdra...@tudelft.nl>
Subject Re: Challenges using Flink REST API
Date Wed, 13 Mar 2019 12:18:55 GMT
Hey Chesnay,

Actually I was mistaken by stating that in the JobManager logs I got the
full stacktrace because I actually got the following there:
2019-03-13 11:55:13,906 ERROR
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception
occurred in REST handler:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.

By some googling I came across this Jira issue [1], which seems to fix my
issue in 1.8.0. However, I was still confused why this ever worked for me
in 1.4.2 and by checking some binaries I found out that the REST API was
reworked for 1.5.0 [2] which removed the full stack trace.

Is there any (official) Docker image to already run Flink 1.8?


[1]: https://jira.apache.org/jira/browse/FLINK-11423
[2]: *https://jira.apache.org/jira/browse/FLINK-7715

Op wo 13 mrt. 2019 om 12:18 schreef Chesnay Schepler <chesnay@apache.org>:

> Can you give me the stacktrace that is logged in the JobManager logs?
> On 13.03.2019 10:57, Wouter Zorgdrager wrote:
> Hi Chesnay,
> Unfortunately this is not true when I run the Flink 1.7.2 docker images.
> The response is still:
> {
>     "errors": [
>         "org.apache.flink.client.program.ProgramInvocationException: The
> main method caused an error."
>     ]
> }
> Regards,
> Wouter Zorgdrager
> Op wo 13 mrt. 2019 om 10:42 schreef Chesnay Schepler <chesnay@apache.org>:
>> You should get the full stacktrace if you upgrade to 1.7.2 .
>> On 13.03.2019 09:55, Wouter Zorgdrager wrote:
>> Hey all!
>> I'm looking for some advice on the following; I'm working on an
>> abstraction on top of Apache Flink to 'pipeline' Flink applications using
>> Kafka. For deployment this means that all these Flink jobs are embedded
>> into one jar and each job is started using an program argument (e.g.
>> "--stage 'FirstFlinkJob'". To ease deploying a set of interconnected Flink
>> jobs onto a cluster I wrote a Python script which basically communicates
>> with the REST client of the JobManager. So you can do things like "pipeline
>> start --jar 'JarWithThePipeline.jar'" and this would deploy every Flink
>> application separately.
>> However, this script was written a while ago against Flink version
>> "1.4.2". This week I tried to upgrade it to Flink latest version but I
>> noticed a change in the REST responses. In order to get the "pipeline
>> start" command working,we need to know all the Flink jobs that are in the
>> jar (we call these Flink jobs 'stages') because we need to know the stage
>> names as argument for the jar. For the 1.4.2 version we used a dirty trick;
>> we ran the jar with '--list --asException' as program arguments which
>> basically runs the jar file and immediately throws an exception with the
>> stage names. These are then parsed and used to start every stage
>> separately. The error message that Flink threw looked something like this:
>> java.util.concurrent.CompletionException:
>> org.apache.flink.util.FlinkException: Could not run the jar.
>> at
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
>> ... 9 more
>> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>> The main method caused an error.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>> at
>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>> at
>> org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
>> at
>> org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
>> at
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
>> ... 8 more
>> Caused by: org.codefeedr.pipeline.PipelineListException:
>> ["org.codefeedr.plugin.twitter.stages.TwitterStatusInput","mongo_tweets","elasticsearch_tweets"]
>> at org.codefeedr.pipeline.Pipeline.showList(Pipeline.scala:114)
>> at org.codefeedr.pipeline.Pipeline.start(Pipeline.scala:100)
>> at nl.wouterr.Main$.main(Main.scala:23)
>> at nl.wouterr.Main.main(Main.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
>> However, for 1.7.0 this trick doesn't work anymore because instead of
>> returning the full stack trace, it only returns the following:
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> caused an error:
>> In the console of the JobManager it does give the full stack trace
>> though. So first of all I'm wondering if there might be a way to enable
>> more detailed stacktraces for Flink 1.7 in the REST responses. If not, do
>> you have any suggestions on how to tackle this problem. I know, in the end
>> this isn't really a Flink problem however you might know a workaround in
>> the Flink REST client to achieve the same.
>> Some solutions I already considered:
>> - Running the jar with the "--list --asException" locally through the
>> Python script; however Flink and Scala are not provided in the jar.
>> Technically I could add them both to the classpath, but this would require
>> users to have the Flink jar locally (and also Scala somewhere, but I assume
>> most have).
>> - Let users provide a list of stage names for all their (interconnected)
>> Flink jobs. This is not really an option, because the (main) idea behind
>> this framework is to reduce the boilerplate and cumbersome of setting up
>> complex stream processing architectures.
>> Any help is appreciated. Thanks in advance!
>> Kind regards,
>> Wouter Zorgdrager

View raw message