flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Biao Liu <mmyy1...@gmail.com>
Subject Re: Recommended approach to debug this
Date Tue, 24 Sep 2019 10:55:44 GMT
The key point of this case is in `PackagedProgram#callMainMethod`.
The `ProgramAbortException` is expected when executing the main method
here. This `ProgramAbortException` thrown is wrapped with
`InvocationTargetException` by Java reflection layer [1]. There is a piece
of codes handling `InvocationTargetException`.

try {
  mainMethod.invoke(null, (Object) args);
}
catch (...
catch (InvocationTargetException e) {
  Throwable exceptionInMethod = e.getTargetException();
  if (exceptionInMethod instanceof Error) {
    throw (Error) exceptionInMethod;        *------>* *`ProgramAbortException`
would be caught expectedly here.*
  } else if (exceptionInMethod instanceof ProgramParametrizationException) {
    throw (ProgramParametrizationException) exceptionInMethod;
  } else if (exceptionInMethod instanceof ProgramInvocationException) {
    throw (ProgramInvocationException) exceptionInMethod;
  } else {     *------> If I'm right, the wrapped exception (Boxed Error or
something else) change the exception, it is caught here*
    throw new ProgramInvocationException("The main method caused an error:
" + exceptionInMethod.getMessage(), exceptionInMethod);
  }

The `ProgramInvocationException` is handled specially in
`OptimizerPlanEnvironment`.

try {
  prog.invokeInteractiveModeForExecution();
}
catch (ProgramInvocationException e) {
  throw e;       *------> The submission is failed here in this case*
}
catch (Throwable t) {
  // the invocation gets aborted with the preview plan
  if (optimizerPlan != null) {
    return optimizerPlan;                    *------> Normally it should be
here*
  } else {
    throw new ProgramInvocationException("The program caused an error: ",
t);
  } ...

1.
https://stackoverflow.com/questions/6020719/what-could-cause-java-lang-reflect-invocationtargetexception

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 17:35, Debasish Ghosh <ghosh.debasish@gmail.com>
wrote:

> Well, I think I got the solution though I am not yet sure of the problem
> .. The original code looked like this ..
>
> Try {
>   // from a parent class called Runner which runs a streamlet
>   // run returns an abstraction which completes a Promise depending on
> whether
>   // the Job was successful or not
>   val streamletExecution =
> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>
>   // the runner waits for the execution to complete
>   // In normal circumstances it will run forever for streaming data source
> unless
>   // being stopped forcibly or any of the queries faces an exception
>   Await.result(streamletExecution.completed, Duration.Inf)
> } match { //..
>
> and then the streamlet.run(..) in turn finally invoked the following ..
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> // creates datastreams and read from / writes to Kafka
> // I pasted the body of this earlier in the thread
> buildExecutionGraph()
>
> env.execute(..)
>
> This DID NOT run and failed with the exception I reported earlier. But
> when I change the code to get the run statement out of the Try block,
> things run fine .. like this ..
>
> // from a parent class called Runner which runs a streamlet
> // run returns an abstraction which completes a Promise depending on
> whether
> // the Job was successful or not
> val streamletExecution =
> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>
> Try {
>   // the runner waits for the execution to complete
>   // In normal circumstances it will run forever for streaming data source
> unless
>   // being stopped forcibly or any of the queries faces an exception
>   Await.result(streamletExecution.completed, Duration.Inf)
> } match { //..
>
> Apparently it looks like the exception that I was facing earlier leaked
> through the Flink engine and Try caught it and it got logged. But removing
> it out of Try now enables Flink to catch it back and follow the course that
> it should. But I am not sure if this is a cogent explanation and looking
> forward to some more accurate one from the experts. Note there is no
> asynchrony of concurrency going on here - the Runner code may look a bit
> over-engineered but there is a context to this. The Runner code handles not
> only Flink but other types of streaming engines as well like Spark and Akka
> Streams.
>
> regards.
>
>
> On Tue, Sep 24, 2019 at 10:17 AM Biao Liu <mmyy1110@gmail.com> wrote:
>
>> Hi Zili,
>>
>> Thanks for pointing that out.
>> I didn't realize that it's a REST API based case. Debasish's case has
>> been discussed not only in this thread...
>>
>> It's really hard to analyze the case without the full picture.
>>
>> I think the reason of why `ProgramAbortException` is not caught is that
>> he did something outside `env.execute`. Like executing this piece of codes
>> inside a Scala future.
>>
>> I guess the scenario is that he is submitting job through REST API. But
>> in the main method, he wraps `env.execute` with Scala future, not executing
>> it directly.
>> The reason of env has been set to `StreamPlanEnvironment` is
>> `JarHandlerUtils` retrieves job graph through it.
>> And the `ProgramAbortException` is not thrown out, because the Scala
>> future tackles this exception.
>> So retrieving job graph fails due to an unrecognized exception (Boxed
>> Error).
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Tue, 24 Sep 2019 at 10:44, Zili Chen <wander4096@gmail.com> wrote:
>>
>>> Hi Biao,
>>>
>>> The log below already infers that the job was submitted via REST API and
>>> I don't think it matters.
>>>
>>> at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$
>>> JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>>> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$
>>> getJobGraphAsync$6(JarRunHandler.java:142)
>>>
>>> What I don't understand it that flink DOES catch the exception at the
>>> point it is reported thrown...
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Biao Liu <mmyy1110@gmail.com> 于2019年9月24日周二 上午10:34写道:
>>>
>>>>
>>>> > We submit the code through Kubernetes Flink Operator which uses the
>>>> REST API to submit the job to the Job Manager
>>>>
>>>> So you are submitting job through REST API, not Flink client? Could you
>>>> explain more about this?
>>>>
>>>> Thanks,
>>>> Biao /'bɪ.aʊ/
>>>>
>>>>
>>>>
>>>> On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh <ghosh.debasish@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Dian -
>>>>>
>>>>> We submit one job through the operator. We just use the following to
>>>>> complete a promise when the job completes ..
>>>>>
>>>>>       Try {
>>>>>         createLogic.executeStreamingQueries(ctx.env)
>>>>>       }.fold(
>>>>>         th ⇒ completionPromise.tryFailure(th),
>>>>>         _ ⇒ completionPromise.trySuccess(Dun)
>>>>>       )
>>>>>
>>>>> If we totally do away with the promise and future stuff then we don't
>>>>> get the boxed error - only the exception reported in Caused By.
>>>>>
>>>>> regards.
>>>>>
>>>>> On Mon, Sep 23, 2019 at 10:20 PM Dian Fu <dian0511.fu@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Debasish,
>>>>>>
>>>>>> In which case will the exception occur? Does it occur when you submit
>>>>>> one job at a time or when multiple jobs are submitted at the same
time? I'm
>>>>>> asking this because I noticed that you used Future to execute the
job
>>>>>> unblocking. I guess ThreadLocal doesn't work well in this case.
>>>>>>
>>>>>> Regards,
>>>>>> Dian
>>>>>>
>>>>>> 在 2019年9月23日,下午11:57,Debasish Ghosh <ghosh.debasish@gmail.com>
写道:
>>>>>>
>>>>>> Hi tison -
>>>>>>
>>>>>> Please find my response below in >>.
>>>>>>
>>>>>> regards.
>>>>>>
>>>>>> On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <wander4096@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Debasish,
>>>>>>>
>>>>>>> The OptimizerPlanEnvironment.ProgramAbortException should be
caught
>>>>>>> at OptimizerPlanEnvironment#getOptimizedPlan
>>>>>>> in its catch (Throwable t) branch.
>>>>>>>
>>>>>>
>>>>>> >> true but what I get is a StreamPlanEnvironment. From my
code I am
>>>>>> only doing val env =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment.
>>>>>>
>>>>>>>
>>>>>>> It should always throw a ProgramInvocationException instead of
>>>>>>> OptimizerPlanEnvironment.ProgramAbortException if any
>>>>>>> exception thrown in the main method of your code.
>>>>>>>
>>>>>>> Another important problem is how the code is executed, (set context
>>>>>>> environment should be another flink internal operation)
>>>>>>> but given that you submit the job via flink k8s operator it might
>>>>>>> require time to take a look at k8s operator implementation.
>>>>>>>
>>>>>>
>>>>>> >> We submit the code through Kubernetes Flink Operator which
uses
>>>>>> the REST API to submit the job to the Job Manager
>>>>>>
>>>>>>>
>>>>>>> However, given we catch Throwable in the place this exception
>>>>>>> thrown, I highly suspect whether it is executed by an official
>>>>>>> flink release.
>>>>>>>
>>>>>>
>>>>>> >> It is an official Flink release 1.9.0
>>>>>>
>>>>>>>
>>>>>>> A completed version of the code and the submission process is
>>>>>>> helpful. Besides, what is buildExecutionGraph return type,
>>>>>>> I think it is not ExecutionGraph in flink...
>>>>>>>
>>>>>>
>>>>>> >> buildExecutionGraph is our function which returns a Unit.
It's not
>>>>>> ExecutionGraph. It builds the DataStream s by reading from Kafka
and then
>>>>>> finally writes to Kafka.
>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> tison.
>>>>>>>
>>>>>>>
>>>>>>> Debasish Ghosh <ghosh.debasish@gmail.com> 于2019年9月23日周一
下午8:21写道:
>>>>>>>
>>>>>>>> This is the complete stack trace which we get from execution
on
>>>>>>>> Kubernetes using the Flink Kubernetes operator .. The boxed
error comes
>>>>>>>> from the fact that we complete a Promise with Success when
it returns a
>>>>>>>> JobExecutionResult and with Failure when we get an exception.
And here we r
>>>>>>>> getting an exception. So the real stack trace we have is
the one below in
>>>>>>>> Caused By.
>>>>>>>>
>>>>>>>> java.util.concurrent.ExecutionException: Boxed Error
>>>>>>>> at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
>>>>>>>> at
>>>>>>>> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
>>>>>>>> at
>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
>>>>>>>> at scala.concurrent.Promise.tryFailure(Promise.scala:112)
>>>>>>>> at scala.concurrent.Promise.tryFailure$(Promise.scala:112)
>>>>>>>> at
>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)
>>>>>>>> at
>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186)
>>>>>>>> at
>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186)
>>>>>>>> at scala.util.Failure.fold(Try.scala:240)
>>>>>>>> at
>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187)
>>>>>>>> at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153)
>>>>>>>> at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
>>>>>>>> at scala.util.Try$.apply(Try.scala:213)
>>>>>>>> at pipelines.runner.Runner$.run(Runner.scala:43)
>>>>>>>> at pipelines.runner.Runner$.main(Runner.scala:30)
>>>>>>>> at pipelines.runner.Runner.main(Runner.scala)
>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>> at
>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>> at
>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
>>>>>>>> at
>>>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>> at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>> Caused by:
>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>>> at
>>>>>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320)
>>>>>>>> at
>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184)
>>>>>>>> at scala.util.Try$.apply(Try.scala:213)
>>>>>>>> at
>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184)
>>>>>>>> ... 20 more
>>>>>>>>
>>>>>>>> regards.
>>>>>>>>
>>>>>>>> On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <dian0511.fu@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Regarding to the code you pasted, personally I think
nothing is
>>>>>>>>> wrong. The problem is how it's executed. As you can see
from the
>>>>>>>>> implementation of of StreamExecutionEnvironment.getExecutionEnvironment,
it
>>>>>>>>> may created different StreamExecutionEnvironment implementations
under
>>>>>>>>> different scenarios. Could you paste the full exception
stack if it exists?
>>>>>>>>> It's difficult to figure out what's wrong with the current
stack trace.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Dian
>>>>>>>>>
>>>>>>>>> 在 2019年9月23日,下午6:55,Debasish Ghosh <ghosh.debasish@gmail.com>
写道:
>>>>>>>>>
>>>>>>>>> Can it be the case that the threadLocal stuff in
>>>>>>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609
does
>>>>>>>>> not behave deterministically when we submit job through
a Kubernetes Flink
>>>>>>>>> operator ? Utils also selects the factory to create the
context based on
>>>>>>>>> either Thread local storage or a static mutable variable.
>>>>>>>>>
>>>>>>>>> Can these be source of problems in our case ?
>>>>>>>>>
>>>>>>>>> regards.
>>>>>>>>>
>>>>>>>>> On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <
>>>>>>>>> ghosh.debasish@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> ah .. Ok .. I get the Throwable part. I am using
>>>>>>>>>>
>>>>>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>>>
>>>>>>>>>> How can this lead to a wrong StreamExecutionEnvironment
? Any
>>>>>>>>>> suggestion ?
>>>>>>>>>>
>>>>>>>>>> regards.
>>>>>>>>>>
>>>>>>>>>> On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <dian0511.fu@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Debasish,
>>>>>>>>>>>
>>>>>>>>>>> As I said before, the exception is caught in
[1]. It catches the
>>>>>>>>>>> Throwable and so it could also catch "
>>>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException".
Regarding to
>>>>>>>>>>> the cause of this exception, I have the same
feeling with Tison and I also
>>>>>>>>>>> think that the wrong StreamExecutionEnvironment
is used.
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Dian
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>>>>>>>>>>
>>>>>>>>>>> 在 2019年9月23日,下午6:08,Debasish
Ghosh <ghosh.debasish@gmail.com>
>>>>>>>>>>> 写道:
>>>>>>>>>>>
>>>>>>>>>>> Hi Tison -
>>>>>>>>>>>
>>>>>>>>>>> This is the code that builds the computation
graph. readStream reads
>>>>>>>>>>> from Kafka and writeStream writes to Kafka.
>>>>>>>>>>>
>>>>>>>>>>>     override def buildExecutionGraph = {
>>>>>>>>>>>       val rides: DataStream[TaxiRide] =
>>>>>>>>>>>         readStream(inTaxiRide)
>>>>>>>>>>>           .filter { ride ⇒ ride.getIsStart().booleanValue
}
>>>>>>>>>>>           .keyBy("rideId")
>>>>>>>>>>>
>>>>>>>>>>>       val fares: DataStream[TaxiFare] =
>>>>>>>>>>>         readStream(inTaxiFare)
>>>>>>>>>>>           .keyBy("rideId")
>>>>>>>>>>>
>>>>>>>>>>>       val processed: DataStream[TaxiRideFare]
=
>>>>>>>>>>>         rides
>>>>>>>>>>>           .connect(fares)
>>>>>>>>>>>           .flatMap(new EnrichmentFunction)
>>>>>>>>>>>
>>>>>>>>>>>       writeStream(out, processed)
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>> I also checked that my code enters this function
>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
and
>>>>>>>>>>> then the exception is thrown. I tried to do a
grep on the Flink code base
>>>>>>>>>>> to see where this exception is caught. If I take
off the tests, I don't see
>>>>>>>>>>> any catch of this exception ..
>>>>>>>>>>>
>>>>>>>>>>> $ find . -name "*.java" | xargs grep
>>>>>>>>>>> "OptimizerPlanEnvironment.ProgramAbortException"
>>>>>>>>>>> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java:
>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>>>> ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java:
>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException
pae) {
>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException
pae) {
>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException
pae) {
>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException
pae) {
>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException
pae) {
>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException
pae) {
>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException
pae) {
>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException
pae) {
>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException
pae) {
>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException
pae) {
>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException
pae) {
>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException
pae) {
>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java:
>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException
pae) {
>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java:
>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException
pae) {
>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import
>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java:
>>>>>>>>>>> @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class,
>>>>>>>>>>> timeout = 30_000)
>>>>>>>>>>>
>>>>>>>>>>> What am I missing here ?
>>>>>>>>>>>
>>>>>>>>>>> regards.
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <wander4096@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Debasish,
>>>>>>>>>>>>
>>>>>>>>>>>> As mentioned by Dian, it is an internal exception
that should
>>>>>>>>>>>> be always caught by
>>>>>>>>>>>> Flink internally. I would suggest you share
the
>>>>>>>>>>>> job(abstractly). Generally it is because
>>>>>>>>>>>> you use StreamPlanEnvironment/OptimizerPlanEnvironment
directly.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> tison.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Austin Cawley-Edwards <austin.cawley@gmail.com>
于2019年9月23日周一
>>>>>>>>>>>> 上午5:09写道:
>>>>>>>>>>>>
>>>>>>>>>>>>> Have you reached out to the FlinkK8sOperator
team on Slack?
>>>>>>>>>>>>> They’re usually pretty active on there.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Here’s the link:
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Austin
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Sep 22, 2019 at 12:38 PM Debasish
Ghosh <
>>>>>>>>>>>>> ghosh.debasish@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> The problem is I am submitting Flink
jobs to Kubernetes
>>>>>>>>>>>>>> cluster using a Flink Operator. Hence
it's difficult to debug in the
>>>>>>>>>>>>>> traditional sense of the term. And
all I get is the exception that I
>>>>>>>>>>>>>> reported ..
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Caused by:
>>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am thinking that this exception
must be coming because of
>>>>>>>>>>>>>> some other exceptions, which are
not reported BTW. I expected a Caused By
>>>>>>>>>>>>>> portion in the stack trace. Any clue
as to which area I should look into to
>>>>>>>>>>>>>> debug this.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Sep 21, 2019 at 8:10 AM Debasish
Ghosh <
>>>>>>>>>>>>>> ghosh.debasish@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for the pointer .. I will
try debugging. I am getting
>>>>>>>>>>>>>>> this exception running my application
on Kubernetes using the Flink
>>>>>>>>>>>>>>> operator from Lyft.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, 21 Sep 2019 at 6:44 AM,
Dian Fu <
>>>>>>>>>>>>>>> dian0511.fu@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This exception is used internally
to get the plan of a job
>>>>>>>>>>>>>>>> before submitting it for
execution. It's thrown with special purpose and
>>>>>>>>>>>>>>>> will be caught internally
in [1] and will not be thrown to end users
>>>>>>>>>>>>>>>> usually.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> You could check the following
places to find out the cause
>>>>>>>>>>>>>>>> to this problem:
>>>>>>>>>>>>>>>> 1. Check the execution environment
you used
>>>>>>>>>>>>>>>> 2. If you can debug, set
a breakpoint at[2] to see if the
>>>>>>>>>>>>>>>> type of the env wrapped in
StreamPlanEnvironment is OptimizerPlanEnvironment.
>>>>>>>>>>>>>>>> Usually it should be.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 在 2019年9月21日,上午4:14,Debasish
Ghosh <
>>>>>>>>>>>>>>>> ghosh.debasish@gmail.com>
写道:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi -
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> When you get an exception
stack trace like this ..
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Caused by:
>>>>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> what is the recommended approach
of debugging ? I mean what
>>>>>>>>>>>>>>>> kind of errors can potentially
lead to such a stacktrace ? In my case it
>>>>>>>>>>>>>>>> starts from env.execute(..)
but does not give any information as to what
>>>>>>>>>>>>>>>> can go wrong.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Any help will be appreciated.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Sent from my iPhone
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>
>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Debasish Ghosh
>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>
>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Debasish Ghosh
>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>
>>>>>>>>> Twttr: @debasishg
>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Debasish Ghosh
>>>>>>>> http://manning.com/ghosh2
>>>>>>>> http://manning.com/ghosh
>>>>>>>>
>>>>>>>> Twttr: @debasishg
>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Debasish Ghosh
>>>>>> http://manning.com/ghosh2
>>>>>> http://manning.com/ghosh
>>>>>>
>>>>>> Twttr: @debasishg
>>>>>> Blog: http://debasishg.blogspot.com
>>>>>> Code: http://github.com/debasishg
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Debasish Ghosh
>>>>> http://manning.com/ghosh2
>>>>> http://manning.com/ghosh
>>>>>
>>>>> Twttr: @debasishg
>>>>> Blog: http://debasishg.blogspot.com
>>>>> Code: http://github.com/debasishg
>>>>>
>>>>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Mime
View raw message