Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E80DE200C4F for ; Sat, 1 Apr 2017 18:30:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E661E160B9D; Sat, 1 Apr 2017 16:30:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 12B3A160B78 for ; Sat, 1 Apr 2017 18:30:43 +0200 (CEST) Received: (qmail 50662 invoked by uid 500); 1 Apr 2017 16:30:43 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 50650 invoked by uid 99); 1 Apr 2017 16:30:42 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 01 Apr 2017 16:30:42 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 56EA11A04E8 for ; Sat, 1 Apr 2017 16:30:42 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.796 X-Spam-Level: X-Spam-Status: No, score=-0.796 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-2.796, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=datatorrent-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id Ygh5B4K5RBSg for ; Sat, 1 Apr 2017 16:30:40 +0000 (UTC) Received: from mail-pg0-f47.google.com (mail-pg0-f47.google.com [74.125.83.47]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id ECB955F243 for ; Sat, 1 Apr 2017 16:30:39 +0000 (UTC) Received: by mail-pg0-f47.google.com with SMTP id x125so91321387pgb.0 for ; Sat, 01 Apr 2017 09:30:39 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=datatorrent-com.20150623.gappssmtp.com; s=20150623; h=subject:to:references:from:organization:message-id:date:user-agent :mime-version:in-reply-to; bh=Um/B7t7dlk6qVgGyFXZIqCt9HhPc+faK2vrllJs43WE=; b=fbvvXHxAY1hWSI7WULuRHgXfcCQ5l/vEcXnWH2Q7OmSOGzsQwVtu3haq+7HZsDy3i7 zs29i+69QuWynalf6F63mC8BJC1LIIDENSbFDM24WCK2t5qvJlyQdg5RgUWgkL5z/L7R JuhFeEoQSOY7W3scvEJ3JfWZQjyZB9B49Zf35kPx7qOvfdtWUcvrCIQKP2ituDn0d1s8 Mv+o1lKpoTRTd8q6WwhfT2j8iNidl6fGU1GqFyUE6TAK1Q2cyoRfaJRc4CEWf99KU5hc cSIkkm2U2khnGmsVpN/CaRlAgH/jclskuItH3FC2peCmna1+jMG5fK2zK5zflSBJlK7z 2Xbw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:subject:to:references:from:organization :message-id:date:user-agent:mime-version:in-reply-to; bh=Um/B7t7dlk6qVgGyFXZIqCt9HhPc+faK2vrllJs43WE=; b=Y1qzRDVqxLMrRR3ZrmcUUJ5ASzDAfGlST0JyI3rr8E04Bs7XcswGkf+Uk6mV81ZGft CbDnJfmm/OApbyEuRDu8tDDIWDg37doRuTAMXmVJ9072Vj8jo+AemD4wAL8TBcXJSKIW HYBojV/h5oskZaIABcGH96CQIBCC+Gf3LlTtqr0C0eihqQTY58oOl6W5X0VfxH1Rw01r 39GbZdgHukuvaYluTNnvkv+YJYp6ln/deZEhHtWmfyPrkhc7ujP5lbSDfQDOjgpzr3bk hSBjlEpdSft9qLLv8ABmUEpjR+WCM527NcGPEMnrep9dIPS2vaCayTX1WaWQEqMBA9kK 5pxA== X-Gm-Message-State: AFeK/H1jLhnrsdPuZAISzRg0gVmac9tdJ/lXusaCKknUOWpXeJRyuFjLMVziP+668qH16Gn8 X-Received: by 10.98.148.25 with SMTP id m25mr7837648pfe.45.1491064232917; Sat, 01 Apr 2017 09:30:32 -0700 (PDT) Received: from vrozov.local (c-71-204-190-196.hsd1.ca.comcast.net. [71.204.190.196]) by smtp.googlemail.com with ESMTPSA id q23sm16970479pfg.63.2017.04.01.09.30.31 for (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Sat, 01 Apr 2017 09:30:32 -0700 (PDT) Subject: Re: open/close ports and active/inactive streams To: dev@apex.apache.org References: From: Vlad Rozov Organization: DataTorrent Message-ID: <4408214a-7e21-a5a2-2c9b-6174d6342094@datatorrent.com> Date: Sat, 1 Apr 2017 09:30:31 -0700 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:45.0) Gecko/20100101 Thunderbird/45.8.0 MIME-Version: 1.0 In-Reply-To: Content-Type: multipart/alternative; boundary="------------4EBC3D066E6F3A27237880BE" archived-at: Sat, 01 Apr 2017 16:30:45 -0000 --------------4EBC3D066E6F3A27237880BE Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 7bit Please see my comments inline. Thank you, Vlad //On 4/1/17 09:13, Sanjay Pujare wrote: > Sounds like a good idea, +1 > > Couple of questions/comments: > - "...re-open the port and wait till the stream becomes active again...". > How is the operator informed that the stream is active again or is it just > a property of the output port that the operator needs to check? There are multiple options available and it will be necessary to see which one gives the most flexibility and performance. One of them is to allow an operator to register a callback in the open port call. Until the stream is active, emitting on the port should raise an exception. > - Say an operator has 2 input ports one of which becomes inactive as per > this scenario. It cannot be undeployed because of the other port but the > operator needs to behave differently because it's a different DAG now. Will > a control tuple inform it that the input port is closing? Again, the details will need to be flushed out. It may be an eos (end of stream) control tuple. An operator with 2 input ports may not even care that one of the input ports is connected to inactive stream. It should behave as if the upstream operator does not emit anything on the port. > > > On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov wrote: > >> All, >> >> Currently Apex assumes that an operator can emit on any defined output >> port and all streams defined by a DAG are active. I'd like to propose an >> ability for an operator to open and close output ports. By default all >> ports defined by an operator will be open. In the case an operator for any >> reason decides that it will not emit tuples on the output port, it may >> close it. This will make the stream inactive and the application master may >> undeploy the downstream (for that input stream) operators. If this leads to >> containers that don't have any active operators, those containers may be >> undeployed as well leading to better cluster resource utilization and >> better Apex elasticity. Later, the operator may be in a state where it >> needs to emit tuples on the closed port. In this case, it needs to re-open >> the port and wait till the stream becomes active again before emitting >> tuples on that port. Making inactive stream active again, requires the >> application master to re-allocate containers and re-deploy the downstream >> operators. >> >> It should be also possible for an application designer to mark streams as >> inactive when an application starts. This will allow the application master >> avoid reserving all containers when the application starts. Later, the port >> can be open and inactive stream become active. >> >> Thank you, >> >> Vlad >> >> --------------4EBC3D066E6F3A27237880BE--