From user-return-30575-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Oct 30 07:30:03 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 81459180654 for ; Wed, 30 Oct 2019 08:30:03 +0100 (CET) Received: (qmail 75284 invoked by uid 500); 30 Oct 2019 07:29:59 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 75265 invoked by uid 99); 30 Oct 2019 07:29:59 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Oct 2019 07:29:59 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 2AC8A1A42F7; Wed, 30 Oct 2019 07:29:59 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.253 X-Spam-Level: X-Spam-Status: No, score=0.253 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=0.001, RCVD_IN_MSPIKE_WL=0.001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id yEbajdulipTZ; Wed, 30 Oct 2019 07:29:57 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.215.193; helo=mail-pg1-f193.google.com; envelope-from=wander4096@gmail.com; receiver= Received: from mail-pg1-f193.google.com (mail-pg1-f193.google.com [209.85.215.193]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 700F0BC6D7; Wed, 30 Oct 2019 07:29:57 +0000 (UTC) Received: by mail-pg1-f193.google.com with SMTP id e4so897737pgs.1; Wed, 30 Oct 2019 00:29:57 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:from:date:message-id:subject:to; bh=ixWb2CnDryHvAnxb6Cb2w7vsYnno1ijd4smgsvPbQFE=; b=nqGT6Q92ydYJIrpHU0nlQ7Uq8gAE+TWOpApFALXJ2g+uGbCwmGR3DLcOpX3mogqI+5 iLas+PqSb+WhLUZO5awJOH85Xc4ozcDGyKPy4jwzcBxoWb/WeR9dI2y852XHzSfjh5cx /O7DR8O7XNkpPWe5ZEE/gXa+J9vnyI1gNtilPsm8xq54y/VUe8lE+9t1rq52McDnVbEx 1ACeeeRzNe4Uv8fibUGSnluV8IC9QUaINwjlyiUnyuM8v+SG3XYF7LVMED+OPFDCuRo6 zdB8K5HonMmPIraa987w+i8CtJiW0GV/EtanwQobcyXBORhJhjioGdu3RaQ8kzNaDu2K qQ2w== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=ixWb2CnDryHvAnxb6Cb2w7vsYnno1ijd4smgsvPbQFE=; b=n7kw9SOH8NxWdbod+0kKbjtUVGk50Dh3XlUH8vU/Q4ziNwqmWRiBswrK3N0KFxIb0q 1lkLHCBUXox1kTd48g/hCxzrcS1mXo2kCFTB7GQrKgy+pznCPMCZp+IZsiRCEiBvsjRK FQtxkLBetzA2KWcocd7T5gwsguQRyDGpRKQsrbVP+xj2hvUckyI0gQt4AyQ0xfcwZucO MIWOLSUPP25SujaWkYqxFQGnjXaH87jTFNl6w7mtTXUZHGgEMsxDZBjqSTS/7+V1Qfmr odQYHPKKiaFg+CBusK5SxIHC4i3Hg1kfasUtudh5V7VaIY0GifYvyI+Lg/nA8W0fKQlJ B7kQ== X-Gm-Message-State: APjAAAX5FayDz3hfjt7GN7g/J+sYC6eaH/xqHj3CrbU85/SbSll9pfcO jztW+z8ILXdnxEXBN1Y5xt1LtMORIVT9tEsPXnJU0Hpp81U= X-Google-Smtp-Source: APXvYqz3IacYlZY+4FmlXUGQc0Fk3E/yPV/AxvehoB4kfQJTvNUnhFQKP/EjMEqN/CrJTnf3RJqlx+2tHFgowYR2vHQ= X-Received: by 2002:a63:778b:: with SMTP id s133mr2758673pgc.116.1572420596219; Wed, 30 Oct 2019 00:29:56 -0700 (PDT) MIME-Version: 1.0 From: tison Date: Wed, 30 Oct 2019 15:29:58 +0800 Message-ID: Subject: [DISCUSS] Semantic and implementation of per-job mode To: dev , user Content-Type: multipart/alternative; boundary="00000000000013423a05961bb3dc" --00000000000013423a05961bb3dc Content-Type: text/plain; charset="UTF-8" (CC user list because I think users may have ideas on how per-job mode should look like) Hi all, In the discussion about Flink on k8s[1] we encounter a problem that opinions diverge in how so-called per-job mode works. This thread is aimed at stating a dedicated discussion about per-job semantic and how to implement it. **The AS IS per-job mode** * in standalone deployment, we bundle user jar with Flink jar, retrieve JobGraph which is the very first JobGraph from user program in classpath, and then start a Dispatcher with this JobGraph preconfigured, which launches it as "recovered" job. * in YARN deployment, we accept submission via CliFrontend, extract JobGraph which is the very first JobGraph from user program submitted, serialize the JobGraph and upload it to YARN as resource, and then when AM starts, retrieve the JobGraph as resource and start Dispatcher with this JobGraph preconfigured, follows are the same. Specifically, in order to support multiple parts job, if YARN deployment configured as "attached", it starts a SessionCluster, proceeds the progress and shutdown the cluster on job finished. **Motivation** The implementation mentioned above, however, suffers from problems. The major two of them are 1. only respect the very first JobGraph from user program 2. compile job in client side 1. Only respect the very first JobGraph from user program There is already issue about this topic[2]. As we extract JobGraph from user program by hijacking Environment#execute we actually abort any execution after the first call to #execute. Besides it surprises users many times that any logic they write in the program is possibly never executed, here the problem is that the semantic of "job" from Flink perspective. I'd like to say in current implementation "per-job" is actually "per-job-graph". However, in practices since we support jar submission it is "per-program" semantic wanted. 2. Compile job in client side Well, standalone deployment is not in the case. But in YARN deployment, we compile job and get JobGraph in client side, and then upload it to YARN. This approach, however, somehow breaks isolation. We have observed that user program contains exception handling logic which call System.exit in main method, which causes a compilation of the job exit the whole client at once. It is a critical problem if we manage multiple Flink job in a unique platform. In this case, it shut down the whole service. Besides there are many times I was asked why per-job mode doesn't run "just like" session mode but with a dedicated cluster. It might imply that current implementation mismatches users' demand. **Proposal** In order to provide a "per-program" semantic mode which acts "just like" session mode but with a dedicated cluster, I propose a workflow as below. It acts like starting a drive on cluster but is not a general driver solution as proposed here[3], the main purpose of the workflow below is for providing a "per-program" semantic mode. *From CliFrontend* 1. CliFrontend receives submission, gathers all configuration and starts a corresponding ClusterDescriptor. 2. ClusterDescriptor deploys a cluster with main class ProgramClusterEntrypoint while shipping resources including user program. 3. ProgramClusterEntrypoint#main contains logic starting components including Standalone Dispatcher, configuring user program to start a RpcClusterClient, and then invoking main method of user program. 4. RpcClusterClient acts like MiniClusterClient which is able to submit the JobGraph after leader elected so that we don't fallback to round-robin or fail submission due to no leader. 5. Whether or not deliver job result depends on user program logic, since we can already get a JobClient from execute. ProgramClusterEntrypoint exits on user program exits and all jobs submitted globally terminate. This way fits in the direction of FLIP-73 because strategy starting a RpcClusterClient can be regarded as a special Executor. After ProgramClusterEntrypoint#main starts a Cluster, it generates and passes configuration to user program so that when Executor generated, it knows to use a RpcClusterClient for submission and the address of Dispatcher. **Compatibility** In my opinion this mode can be totally an add-on to current codebase. We actually don't replace current per-job mode with so-called "per-program" mode. It happens that current per-job mode would be useless if we have such "per-program" mode so that we possibly deprecate it for preferring the other. I'm glad to discuss more into details if you're interested in, but let's say we'd better first reach a consensus on the overall design :-) Looking forward to your reply! Best, tison. [1] https://issues.apache.org/jira/browse/FLINK-9953 [2] https://issues.apache.org/jira/browse/FLINK-10879 [3] https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit# --00000000000013423a05961bb3dc Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
(CC user list because I think users may have ideas on how per-job= mode should look like)

Hi all,

In the discussion about Flink on k8= s[1] we encounter a problem that opinions
diverge in how so-called per-j= ob mode works. This thread is aimed at stating
a dedicated discussion ab= out per-job semantic and how to implement it.

**The AS IS per-job mo= de**

* in standalone deployment, we bundle user jar with Flink jar, = retrieve
JobGraph which is the very first JobGraph from user program in = classpath,
and then start a Dispatcher with this JobGraph preconfigured,= which
launches it as "recovered" job.

* in YARN deploy= ment, we accept submission via CliFrontend, extract JobGraph
which is th= e very first JobGraph from user program submitted, serialize
the JobGrap= h and upload it to YARN as resource, and then when AM starts,
retrieve t= he JobGraph as resource and start Dispatcher with this JobGraph
preconfi= gured, follows are the same.

Specifically, in order to support multi= ple parts job, if YARN deployment
configured as "attached", it= starts a SessionCluster, proceeds the progress
and shutdown the cluster= on job finished.

**Motivation**

The implementation mentioned= above, however, suffers from problems. The major
two of them are 1. onl= y respect the very first JobGraph from user program 2.
compile job in cl= ient side

1. Only respect the very first JobGraph from user program<= br>
There is already issue about this topic[2]. As we extract JobGraph f= rom user
program by hijacking Environment#execute we actually abort any = execution
after the first call to #execute. Besides it surprises users m= any times that
any logic they write in the program is possibly never exe= cuted, here the
problem is that the semantic of "job" from Fli= nk perspective. I'd like to say
in current implementation "per-= job" is actually "per-job-graph". However,
in practices s= ince we support jar submission it is "per-program" semantic
wa= nted.

2. Compile job in client side

Well, standalone deployme= nt is not in the case. But in YARN deployment, we
compile job and get Jo= bGraph in client side, and then upload it to YARN.
This approach, howeve= r, somehow breaks isolation. We have observed that user
program contains= exception handling logic which call System.exit in main
method, which c= auses a compilation of the job exit the whole client at once.
It is a cr= itical problem if we manage multiple Flink job in a unique platform.
In = this case, it shut down the whole service.

Besides there are many ti= mes I was asked why per-job mode doesn't run
"just like" s= ession mode but with a dedicated cluster. It might imply that
current im= plementation mismatches users' demand.

**Proposal**

In or= der to provide a "per-program" semantic mode which acts "jus= t like" session
mode but with a dedicated cluster, I propose a work= flow as below. It acts like
starting a drive on cluster but is not a gen= eral driver solution as proposed
here[3], the main purpose of the workfl= ow below is for providing a "per-program"
semantic mode.
*From CliFrontend*

1. CliFrontend receives submission, gathers all= configuration and starts a
corresponding ClusterDescriptor.

2. Cl= usterDescriptor deploys a cluster with main class ProgramClusterEntrypoint<= br>while shipping resources including user program.

3. ProgramCluster= Entrypoint#main contains logic starting components including
Standalone = Dispatcher, configuring user program to start a RpcClusterClient,
and th= en invoking main method of user program.

4. RpcClusterClient acts lik= e MiniClusterClient which is able to submit the
JobGraph after leader el= ected so that we don't fallback to round-robin or
fail submission du= e to no leader.

5. Whether or not deliver job result depends on user = program logic, since we
can already get a JobClient from execute. Progra= mClusterEntrypoint exits on
user program exits and all jobs submitted gl= obally terminate.

This way fits in the direction of FLIP-73 because = strategy starting a
RpcClusterClient can be regarded as a special Execut= or. After
ProgramClusterEntrypoint#main starts a Cluster, it generates a= nd passes configuration to
user program so that when Executor generated,= it knows to use a RpcClusterClient
for submission and the address of Di= spatcher.

**Compatibility**

In my opinion this mode can be to= tally an add-on to current codebase. We
actually don't replace curre= nt per-job mode with so-called "per-program" mode.
It happens = that current per-job mode would be useless if we have such
"per-pro= gram" mode so that we possibly deprecate it for preferring the other.<= br>
I'm glad to discuss more into details if you're interested i= n, but let's say
we'd better first reach a consensus on the over= all design :-)

Looking forward to your reply!

Best,
tison.=

[1] ht= tps://issues.apache.org/jira/browse/FLINK-9953
[2] https://issues.apache.org/jira= /browse/FLINK-10879
[3] https://docs.google.c= om/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit#
--00000000000013423a05961bb3dc--