Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C6D5E18668 for ; Mon, 20 Jul 2015 17:00:59 +0000 (UTC) Received: (qmail 60417 invoked by uid 500); 20 Jul 2015 17:00:56 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 60355 invoked by uid 500); 20 Jul 2015 17:00:56 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 60344 invoked by uid 99); 20 Jul 2015 17:00:56 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jul 2015 17:00:56 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id C7D241A7460 for ; Mon, 20 Jul 2015 17:00:55 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.01 X-Spam-Level: X-Spam-Status: No, score=-0.01 tagged_above=-999 required=6.31 tests=[SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 5tBI7jzKn83G for ; Mon, 20 Jul 2015 17:00:48 +0000 (UTC) Received: from mail.net.t-labs.tu-berlin.de (mail.net.t-labs.tu-berlin.de [130.149.220.242]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTP id 511CC47BDA for ; Mon, 20 Jul 2015 17:00:48 +0000 (UTC) Received: from [130.149.220.43] (pyramid.net.t-labs.tu-berlin.de [130.149.220.43]) by mail.net.t-labs.tu-berlin.de (Postfix) with ESMTPSA id D7DB3902 for ; Mon, 20 Jul 2015 19:00:46 +0200 (CEST) From: Niklas Semmler Subject: Re: How do network transmissions in Flink work? To: dev@flink.apache.org References: <559ACAC3.2010608@inet.tu-berlin.de> <559ECC5E.6040006@inet.tu-berlin.de> <64F23DD8-BCB3-43E0-BA1A-93279F705183@apache.org> Organization: INET, TU-Berlin Message-ID: <55AD293E.3060307@inet.tu-berlin.de> Date: Mon, 20 Jul 2015 19:00:46 +0200 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:38.0) Gecko/20100101 Thunderbird/38.0.1 MIME-Version: 1.0 In-Reply-To: <64F23DD8-BCB3-43E0-BA1A-93279F705183@apache.org> Content-Type: text/plain; charset=windows-1252; format=flowed Content-Transfer-Encoding: 8bit Hello Ufuk, thank you very much for the answer. You helped me to bring a great deal of context into the problem :). I have one final question: What is a good indicator that the transfer of data contained in a single ResultPartition is finished? Is there any? Or can the amount of retrieved data be retrieved from the consumer? So far the only indicator I could come up with is the release of the ResultPartition or the state change to "FINISHED" of the task. However, as far as I understand the assigned resources will only be released after all ResultPartitions are transferred and the task is finished, so that seems to be a rather impractical indicator for the end of the data exchange. Sorry for the late reply. Cheers, Niklas On 13.07.2015 15:04, Ufuk Celebi wrote: > Hey Niklas, > > there is also this Wiki entry: https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks > > On 09 Jul 2015, at 21:32, Niklas Semmler wrote: > >> 1. What does the number of ResultSubpartition instances in the ResultPartition correspond to? Is one assigned to each consuming task? If so, how can I find for each ResultSubpartition the corresponding Task, Slot or similar? If not how is decided which piece of the data is routed to which consuming task? > > Yes, for each consuming task. The wiring depends on the DistributionPattern and the parallelism of the producing and consuming operator. You can look into the ExecutionGraph to see how the wiring works (see connect* methods in ExecutionVertex class). Each subpartition corresponds to an ExecutionEdge, which connects two ExecutionVertex instances, which is an abstraction for tasks at runtime. This is essentially also where the routing is set. > > Currently there is no way to get from the subpartition to the corresponding task. You would have to look into the places where the instances are created and pass the reference. The RuntimeEnvironmenet or Task class create these instances when a new task is submitted to a task manager. > > >> 2. What defines the number of Buffer instances per ResultSubpartition? Does one Buffer correspond to exactly one serialized Record? Is a Record the single output of an operator, are there multiple records per operator, or >> does it differ depending on the operator? > > The number of produced buffers depends on the data the corresponding operator/user function produces. Each produced record is serialized into a buffer. It can span multiple buffers depending on the record size. > > There can be zero or more records per produced partition. (There will always be at least a single buffer containing an end-of-partition event per partition though.) > >> 3. Or are the Buffers defined in a completely different manner? In that case, could you give me a pointer to understand how Buffer instances are used? > > The buffers is a wrapper for a MemorySegment with a reference to a buffer pool, which owns the buffer. Buffers are recycled after they have been consumed (e.g. after being written to the TCP channel or by the user code). > > > Feel free to ask further questions or give feedback if you encounter anything you find weird. :-) > > � Ufuk > -- PhD Student / Research Assistant INET, TU Berlin Room 4.029 Marchstr 23 10587 Berlin Tel: +49 30 314 78752