flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3667) Generalize client<->cluster communication
Date Wed, 18 May 2016 10:45:13 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15288767#comment-15288767
] 

ASF GitHub Bot commented on FLINK-3667:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1978#discussion_r63681528
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---
    @@ -211,14 +191,41 @@ public void run() {
     		Runtime.getRuntime().addShutdownHook(clientShutdownHook);
     
     		isConnected = true;
    +
    +		logAndSysout("Waiting until all TaskManagers have connected");
    +
    +		while(true) {
    +			GetClusterStatusResponse status = getClusterStatus();
    +			if (status != null) {
    +				if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount())
{
    +					logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/"
    +						+ clusterDescriptor.getTaskManagerCount() + ")");
    +				} else {
    +					logAndSysout("All TaskManagers are connected");
    +					break;
    +				}
    +			} else {
    +				logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
    +			}
    +
    +			try {
    +				Thread.sleep(500);
    +			}
    +			catch (InterruptedException e) {
    +				LOG.error("Interrupted while waiting for TaskManagers");
    +				System.err.println("Thread is interrupted");
    +				Thread.currentThread().interrupt();
    --- End diff --
    
    I guess I wrote this code. If I'm not mistaken, this means that the while(true) loop is
not interruptable, right?
    I think we should change that and break out of the loop if it has been interrupted.
    
    What do you think?


> Generalize client<->cluster communication
> -----------------------------------------
>
>                 Key: FLINK-3667
>                 URL: https://issues.apache.org/jira/browse/FLINK-3667
>             Project: Flink
>          Issue Type: Improvement
>          Components: YARN Client
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>
> Here are some notes I took when inspecting the client<->cluster classes with regard
to future integration of other resource management frameworks in addition to Yarn (e.g. Mesos).
> {noformat}
> 1 Cluster Client Abstraction
> ════════════════════════════
> 1.1 Status Quo
> ──────────────
> 1.1.1 FlinkYarnClient
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
>   • Holds the cluster configuration (Flink-specific and Yarn-specific)
>   • Contains the deploy() method to deploy the cluster
>   • Creates the Hadoop Yarn client
>   • Receives the initial job manager address
>   • Bootstraps the FlinkYarnCluster
> 1.1.2 FlinkYarnCluster
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
>   • Wrapper around the Hadoop Yarn client
>   • Queries cluster for status updates
>   • Life time methods to start and shutdown the cluster
>   • Flink specific features like shutdown after job completion
> 1.1.3 ApplicationClient
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
>   • Acts as a middle-man for asynchronous cluster communication
>   • Designed to communicate with Yarn, not used in Standalone mode
> 1.1.4 CliFrontend
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
>   • Deeply integrated with FlinkYarnClient and FlinkYarnCluster
>   • Constantly distinguishes between Yarn and Standalone mode
>   • Would be nice to have a general abstraction in place
> 1.1.5 Client
> ╌╌╌╌╌╌╌╌╌╌╌╌
>   • Job submission and Job related actions, agnostic of resource framework
> 1.2 Proposal
> ────────────
> 1.2.1 ClusterConfig (before: AbstractFlinkYarnClient)
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
>   • Extensible cluster-agnostic config
>   • May be extended by specific cluster, e.g. YarnClusterConfig
> 1.2.2 ClusterClient (before: AbstractFlinkYarnClient)
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
>   • Deals with cluster (RM) specific communication
>   • Exposes framework agnostic information
>   • YarnClusterClient, MesosClusterClient, StandaloneClusterClient
> 1.2.3 FlinkCluster (before: AbstractFlinkYarnCluster)
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
>   • Basic interface to communicate with a running cluster
>   • Receives the ClusterClient for cluster-specific communication
>   • Should not have to care about the specific implementations of the
>     client
> 1.2.4 ApplicationClient
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
>   • Can be changed to work cluster-agnostic (first steps already in
>     FLINK-3543)
> 1.2.5 CliFrontend
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
>   • CliFrontend does never have to differentiate between different
>     cluster types after it has determined which cluster class to load.
>   • Base class handles framework agnostic command line arguments
>   • Pluggables for Yarn, Mesos handle specific commands
> {noformat}
> I would like to create/refactor the affected classes to set us up for a more flexible
client side resource management abstraction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message