Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 36468200D2B for ; Thu, 2 Nov 2017 10:13:06 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 34EFA160BFB; Thu, 2 Nov 2017 09:13:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8209B1609EE for ; Thu, 2 Nov 2017 10:13:05 +0100 (CET) Received: (qmail 74530 invoked by uid 500); 2 Nov 2017 09:13:04 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 74521 invoked by uid 99); 2 Nov 2017 09:13:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Nov 2017 09:13:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id D450A1808FE for ; Thu, 2 Nov 2017 09:13:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id hPpU5DwYLs7K for ; Thu, 2 Nov 2017 09:13:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 59E295FC8A for ; Thu, 2 Nov 2017 09:13:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 91BECE05B7 for ; Thu, 2 Nov 2017 09:13:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 499B323F05 for ; Thu, 2 Nov 2017 09:13:00 +0000 (UTC) Date: Thu, 2 Nov 2017 09:13:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Thu, 02 Nov 2017 09:13:06 -0000 [ https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235434#comment-16235434 ] ASF GitHub Bot commented on FLINK-7153: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r148474427 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall( // Miscellaneous // -------------------------------------------------------------------------------------------- + /** + * Calculates the preferred locations based on the location preference constraint. + * + * @param locationPreferenceConstraint constraint for the location preference + * @return Future containing the collection of preferred locations. This might not be completed if not all inputs + * have been a resource assigned. + */ + @VisibleForTesting + public CompletableFuture> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) { + final Collection> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs(); + final CompletableFuture> preferredLocationsFuture; + + switch(locationPreferenceConstraint) { + case ALL: + preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures); + break; + case ANY: + final ArrayList completedTaskManagerLocations = new ArrayList<>(1); --- End diff -- No, the intention is to return all currently known locations here. Usually there will only be one input because `ANY` is used by lazy scheduling. Initializing it with size 1 is a compromise between size and resizing costs. We could also initialize it with the number of inputs (since this is a small number) but in most cases not all inputs will be known for `ANY`. > Eager Scheduling can't allocate source for ExecutionGraph correctly > ------------------------------------------------------------------- > > Key: FLINK-7153 > URL: https://issues.apache.org/jira/browse/FLINK-7153 > Project: Flink > Issue Type: Bug > Components: JobManager > Affects Versions: 1.3.1 > Reporter: Sihua Zhou > Assignee: Till Rohrmann > Priority: Critical > Fix For: 1.4.0, 1.3.3 > > > The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is two problem about it: > 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return empty, cause `sourceSlot` always be null until `ExectionVertex` has been deployed via 'Execution.deployToSlot()'. So allocate resource base on prefered location can't work correctly, we need to set the slot info for `Execution` as soon as Execution.allocateSlotForExecution() called successfully? > 2. Current allocate strategy can't allocate the slot optimize. Here is the test case: > {code} > JobVertex v1 = new JobVertex("v1", jid1); > JobVertex v2 = new JobVertex("v2", jid2); > SlotSharingGroup group = new SlotSharingGroup(); > v1.setSlotSharingGroup(group); > v2.setSlotSharingGroup(group); > v1.setParallelism(2); > v2.setParallelism(4); > v1.setInvokableClass(BatchTask.class); > v2.setInvokableClass(BatchTask.class); > v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED); > {code} > Currently, after allocate for v1,v2, we got a local partition and three remote partition. But actually, it should be 2 local partition and 2 remote partition. > The causes of the above problems is becuase that the current allocate strategy is allocate the resource for execution one by one(if the execution can allocate from SlotGroup than get it, Otherwise ask for a new one for it). > If we change the allocate strategy to two step will solve this problem, below is the Pseudo code: > {code} > for (ExecutionJobVertex ejv: getVerticesTopologically) { > //step 1: try to allocate from SlothGroup base on inputs one by one (which only allocate resource base on location). > //step 2: allocate for the remain execution. > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)