Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 446DF200D37 for ; Thu, 26 Oct 2017 04:13:43 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 39FD0160BE0; Thu, 26 Oct 2017 02:13:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7F13C160BDA for ; Thu, 26 Oct 2017 04:13:42 +0200 (CEST) Received: (qmail 83361 invoked by uid 500); 26 Oct 2017 02:13:36 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 83349 invoked by uid 99); 26 Oct 2017 02:13:36 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Oct 2017 02:13:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id CD11C180803 for ; Thu, 26 Oct 2017 02:13:35 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.485 X-Spam-Level: *** X-Spam-Status: No, score=3.485 tagged_above=-999 required=6.31 tests=[DKIM_ADSP_CUSTOM_MED=0.001, NML_ADSP_CUSTOM_MED=1.2, SPF_HELO_PASS=-0.001, SPF_SOFTFAIL=0.972, URI_HEX=1.313] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id eVTf2qB6rFdB for ; Thu, 26 Oct 2017 02:13:34 +0000 (UTC) Received: from n4.nabble.com (n4.nabble.com [162.253.133.72]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id A42F55F477 for ; Thu, 26 Oct 2017 02:13:34 +0000 (UTC) Received: from mben.nabble.com (localhost [127.0.0.1]) by n4.nabble.com (Postfix) with ESMTP id E00CA154E2588 for ; Wed, 25 Oct 2017 19:13:33 -0700 (MST) Date: Wed, 25 Oct 2017 19:13:33 -0700 (MST) From: David Dreyfus To: user@flink.apache.org Message-ID: <1508984013915-0.post@n4.nabble.com> Subject: Tasks, slots, and partitioned joins MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit archived-at: Thu, 26 Oct 2017 02:13:43 -0000 Hello - I have a large number of pairs of files. For purpose of discussion: /source1/{1..10000} and /source2/{1..10000}. I want to join the files pair-wise: /source1/1 joined to /source2/1, /source1/2 joined to /source2/2, and so on. I then want to union the results of the pair-wise joins and perform an aggregate. I create a simple flink job that has four sources, two joins, and two sinks to produce intermediate results. This represents two unrelated chains. I notice that when running this job with parallelism = 1 on a standalone machine with one task manager and 3 slots, only one slot gets used. My concern is that when I scale up to a YARN cluster, flink will continue to use one slot on one machine instead of using all slots on all machines. Prior reading suggests all the data source subtasks are added to a default resource group. Downstream tasks (joins and sinks) want to be colocated with the data sources. The result is all of my tasks are executed in one slot. Flink Stream (DataStream) offers the slotSharingGroup() function. This doesn't seem available to the DataSet user. *Q1:* How do I force Flink to distribute work evenly across task managers and the slots allocated to them? If this shouldn't be a concern, please elaborate. When I scale up the number of unrelated chains I notice that flink seems to start all of them at the same time, which results in thrashing and errors - lots of IO and errors regarding hash buffers. *Q2:* Is there any method for controlling the scheduling of tasks so that some finish before others start? My work around is to execute multiple, sequential batches with results going into an intermediate directory, and then a final job that aggregates the results. I would certainly prefer one job that might avoid the intermediate write. If I treat /source1 as one data source and /source2 as the second, and then join the two, flink will shuffle and partition the files on the join key. The /source1 and /source2 files represent this partitioning. They are reused multiple times; thus, I shuffle and save the results creating /source1 and /source2. *Q3:* Does flink have a method by which I can mark individual files (or directories) as belonging to a particular partition so that when I try to join them, the unnecessary shuffle and repartition is avoided? Thank you, David -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/