flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: [Discuss] Add JobListener (hook) in flink job lifecycle
Date Thu, 09 May 2019 08:13:28 GMT
Hi everybody,
any news on this? For us would be VERY helpful to have such a feature
because we need to execute a call to a REST service once a job ends.
Right now we do this after the env.execute() but this works only if the job
is submitted via the CLI client, the REST client doesn't execute anything
after env.execute().


On Thu, Apr 25, 2019 at 3:12 PM Jeff Zhang <zjffdu@gmail.com> wrote:

> Hi  Beckett,
> Thanks for your feedback, See my comments inline
> >>>  How do user specify the listener? *
> What I proposal is to register JobListener in ExecutionEnvironment. I
> don't think we should make ClusterClient as public api.
> >>> Where should the listener run? *
> I don't think it is proper to run listener in JobMaster. The listener is
> user code, and usually it is depends on user's other component. So running
> it in client side make more sense to me.
> >>> What should be reported to the Listener? *
> I am open to add other api in this JobListener. But for now, I am afraid
> the ExecutionEnvironment is not aware of failover, so it is not possible to
> report failover event.
> >>> What can the listeners do on notifications? *
> Do you mean to pass JobGraph to these methods ? like following ( I am
> afraid JobGraph is not a public and stable api, we should not expose it to
> users)
> public interface JobListener {
> void onJobSubmitted(JobGraph graph, JobID jobId);
> void onJobExecuted(JobGraph graph, JobExecutionResult jobResult);
> void onJobCanceled(JobGraph graph, JobID jobId, String savepointPath);
> }
> Becket Qin <becket.qin@gmail.com> 于2019年4月25日周四 下午7:40写道:
>> Thanks for the proposal, Jeff. Adding a listener to allow users handle
>> events during the job lifecycle makes a lot of sense to me.
>> Here are my two cents.
>> * How do user specify the listener? *
>> It is not quite clear to me whether we consider ClusterClient as a public
>> interface? From what I understand ClusterClient is not a public interface
>> right now. In contrast, ExecutionEnvironment is the de facto interface for
>> administrative work. After job submission, it is essentially bound to a job
>> as an administrative handle. Given this current state, personally I feel
>> acceptable to have the listener registered to the ExecutionEnvironment.
>> * Where should the listener run? *
>> If the listener runs on the client side, the client have to be always
>> connected to the Flink cluster. This does not quite work if the Job is a
>> streaming job. Should we provide the option to run the listener in
>> JobMaster as well?
>> * What should be reported to the Listener? *
>> Besides the proposed APIs, does it make sense to also report events such
>> as failover?
>> * What can the listeners do on notifications? *
>> If the listeners are expected to do anything on the job, should some
>> helper class to manipulate the jobs be passed to the listener method?
>> Otherwise users may not be able to easily take action.
>> Thanks,
>> Jiangjie (Becket) Qin
>> On Wed, Apr 24, 2019 at 2:43 PM Jeff Zhang <zjffdu@gmail.com> wrote:
>>> Hi Till,
>>> IMHO, allow adding hooks involves 2 steps.
>>> 1. Provide hook interface, and call these hook in flink (ClusterClient)
>>> at the right place. This should be done by framework (flink)
>>> 2. Implement new hook implementation and add/register them into
>>> framework(flink)
>>> What I am doing is step 1 which should be done by flink, step 2 is done
>>> by users. But IIUC, your suggestion of using custom ClusterClient seems
>>> mixing these 2 steps together. Say I'd like to add new hooks, I have to
>>> implement a new custom ClusterClient, add new hooks and call them in the
>>> custom ClusterClient at the right place.
>>> This doesn't make sense to me. For a user who want to add hooks, he is
>>> not supposed to understand the mechanism of ClusterClient, and should not
>>> touch ClusterClient. What do you think ?
>>> Till Rohrmann <trohrmann@apache.org> 于2019年4月23日周二 下午4:24写道:
>>>> I think we should not expose the ClusterClient configuration via the
>>>> ExecutionEnvironment (env.getClusterClient().addJobListener) because this
>>>> is effectively the same as exposing the JobListener interface directly on
>>>> the ExecutionEnvironment. Instead I think it could be possible to provide
>>>> ClusterClient factory which is picked up from the Configuration or some
>>>> other mechanism for example. That way it would not need to be exposed via
>>>> the ExecutionEnvironment at all.
>>>> Cheers,
>>>> Till
>>>> On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang <zjffdu@gmail.com> wrote:
>>>>> >>>  The ExecutionEnvironment is usually used by the user who
>>>>> the code and this person (I assume) would not be really interested in
>>>>> callbacks.
>>>>> Usually ExecutionEnvironment is used by the user who write the code,
>>>>> but it doesn't needs to be created and configured by this person. e.g.
>>>>> Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user
>>>>> use ExecutionEnvironment to write flink program.  You are right that
>>>>> end user would not be interested in these callback, but the third party
>>>>> library that integrate with zeppelin would be interested in these callbacks.
>>>>> >>> In your case, it could be sufficient to offer some hooks
for the
>>>>> ClusterClient or being able to provide a custom ClusterClient.
>>>>> Actually in my initial PR (https://github.com/apache/flink/pull/8190),
>>>>> I do pass JobListener to ClusterClient and invoke it there.
>>>>> But IMHO, ClusterClient is not supposed be a public api for users.
>>>>> Instead JobClient is the public api that user should use to control job.
>>>>> adding hooks to ClusterClient directly and provide a custom ClusterClient
>>>>> doesn't make sense to me. IIUC, you are suggesting the following approach
>>>>>      env.getClusterClient().addJobListener(jobListener)
>>>>> but I don't see its benefit compared to this.
>>>>>      env.addJobListener(jobListener)
>>>>> Overall, I think adding hooks is orthogonal with fine grained job
>>>>> control. And I agree that we should refactor the flink client component,
>>>>> but I don't think it would affect the JobListener interface. What do
>>>>> think ?
>>>>> Till Rohrmann <trohrmann@apache.org> 于2019年4月18日周四
>>>>>> Thanks for starting this discussion Jeff. I can see the need for
>>>>>> additional hooks for third party integrations.
>>>>>> The thing I'm wondering is whether we really need/want to expose
>>>>>> JobListener via the ExecutionEnvironment. The ExecutionEnvironment
>>>>>> usually used by the user who writes the code and this person (I assume)
>>>>>> would not be really interested in these callbacks. If he would, then
>>>>>> should rather think about a better programmatic job control where
>>>>>> `ExecutionEnvironment#execute` call returns a `JobClient` instance.
>>>>>> Moreover, we would effectively make this part of the public API and
>>>>>> implementation would need to offer it.
>>>>>> In your case, it could be sufficient to offer some hooks for the
>>>>>> ClusterClient or being able to provide a custom ClusterClient. The
>>>>>> ClusterClient is the component responsible for the job submission
>>>>>> retrieval of the job result and, hence, would be able to signal when
a job
>>>>>> has been submitted or completed.
>>>>>> Cheers,
>>>>>> Till
>>>>>> On Thu, Apr 18, 2019 at 8:57 AM vino yang <yanghua1127@gmail.com>
>>>>>> wrote:
>>>>>>> Hi Jeff,
>>>>>>> I personally like this proposal. From the perspective of
>>>>>>> programmability, the JobListener can make the third program more
>>>>>>> appreciable.
>>>>>>> The scene where I need the listener is the Flink cube engine
>>>>>>> Apache Kylin. In the case, the Flink job program is embedded
into the
>>>>>>> Kylin's executable context.
>>>>>>> If we could have this listener, it would be easier to integrate
>>>>>>> Kylin.
>>>>>>> Best,
>>>>>>> Vino
>>>>>>> Jeff Zhang <zjffdu@gmail.com> 于2019年4月18日周四
>>>>>>>> Hi All,
>>>>>>>> I created FLINK-12214
>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-12214>
for adding
>>>>>>>> JobListener (hook) in flink job lifecycle. Since this is
a new public api
>>>>>>>> for flink, so I'd like to discuss it more widely in community
to get more
>>>>>>>> feedback.
>>>>>>>> The background and motivation is that I am integrating flink
into apache
>>>>>>>> zeppelin <http://zeppelin.apache.org/>(which is a notebook
in case
>>>>>>>> you don't know). And I'd like to capture some job context
(like jobId) in
>>>>>>>> the lifecycle of flink job (submission, executed, cancelled)
so that I can
>>>>>>>> manipulate job in more fined grained control (e.g. I can
capture the jobId
>>>>>>>> when job is submitted, and then associate it with one paragraph,
and when
>>>>>>>> user click the cancel button, I can call the flink cancel
api to cancel
>>>>>>>> this job)
>>>>>>>> I believe other projects which integrate flink would need
>>>>>>>> mechanism. I plan to add api addJobListener in
>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment so that user
can add
>>>>>>>> customized hook in flink job lifecycle.
>>>>>>>> Here's draft interface JobListener.
>>>>>>>> public interface JobListener {
>>>>>>>> void onJobSubmitted(JobID jobId);
>>>>>>>> void onJobExecuted(JobExecutionResult jobResult);
>>>>>>>> void onJobCanceled(JobID jobId, String savepointPath);
>>>>>>>> }
>>>>>>>> Let me know your comment and concern, thanks.
>>>>>>>> --
>>>>>>>> Best Regards
>>>>>>>> Jeff Zhang
>>>>> --
>>>>> Best Regards
>>>>> Jeff Zhang
>>> --
>>> Best Regards
>>> Jeff Zhang
> --
> Best Regards
> Jeff Zhang

View raw message