Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5D835200C48 for ; Thu, 6 Apr 2017 18:21:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5C14A160B84; Thu, 6 Apr 2017 16:21:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7BF9C160B83 for ; Thu, 6 Apr 2017 18:21:25 +0200 (CEST) Received: (qmail 58026 invoked by uid 500); 6 Apr 2017 16:21:23 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 57746 invoked by uid 99); 6 Apr 2017 16:21:23 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Apr 2017 16:21:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 0B89A1A0099 for ; Thu, 6 Apr 2017 16:21:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.479 X-Spam-Level: X-Spam-Status: No, score=0.479 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=datatorrent-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id CPtVwvLnpokm for ; Thu, 6 Apr 2017 16:21:20 +0000 (UTC) Received: from mail-pg0-f53.google.com (mail-pg0-f53.google.com [74.125.83.53]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id E3B3F5F24E for ; Thu, 6 Apr 2017 16:21:19 +0000 (UTC) Received: by mail-pg0-f53.google.com with SMTP id 81so40936451pgh.2 for ; Thu, 06 Apr 2017 09:21:19 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=datatorrent-com.20150623.gappssmtp.com; s=20150623; h=subject:to:references:from:organization:message-id:date:user-agent :mime-version:in-reply-to:content-transfer-encoding; bh=bly0IULIAorckN7dMrVlY0ZSIyUaIpK/rXszI5SGdqY=; b=rjgPzhDHgAhE5qJjLlv7kIwzf7qvrMfcupzyka5TEbPWvP9Hnqg0oexreMzXbyo9eC b15I6FhShXu0l5c2TNrhfFg/gbn+4lDRGSpqqNQymo/r/vE+DJbgI2ld1n7q5DB7eBJE 1FxykNqGGntwo71ceHRY+xUtq97hZ5I97AFaklu16wD5D/na9APYuMvDVu6d9EoFPHEv oxmVNK3dGJblduNron4gGC3EtUN2XqulfshRyfVtOGAUG6K8knEnyp/oEc7TGMfqg66z J+w7lHJ2ulL+4Nla8hPuwH6QNnxJO0Q4ExpXjQK8sP2ms8jerG53l14NTIveUML4ApMw T9lQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:subject:to:references:from:organization :message-id:date:user-agent:mime-version:in-reply-to :content-transfer-encoding; bh=bly0IULIAorckN7dMrVlY0ZSIyUaIpK/rXszI5SGdqY=; b=Du49MbTzyywTEwjHH2M9u4iRJzgaudpJIw7F/SDZBlcBM7WBc62UiRzckhFh0SSXb4 RWsOYeqvjfJ6uIlZj8S3sWa6g/X1vDz3wpGI7pNop4xPml7b+XzPrRIzQ7jbvwqHauyQ l3jDr886Mb+SeICB/tB4caAZIWFPA0hz61lvXYbPMu1l6j/l4H/zedEQ9fzg6a+V7Lwo DWvw8AKKg+pKfHCWHMgNkgnjpfBXmeenyaLz1Ul8ui3/cUQT/2DDXRu9TB/jjgqaSvoG lABQocL4FOjFIa/OslXcCSV+3Fc+vRioDeK5p/+vyZOvc/WkI6jm/y3WA5agKEmnCrdF x6OA== X-Gm-Message-State: AFeK/H2I0Ndx6VZ+WsP9Y5FCafc99Sxy/Olz+vKtLwTYrnLyxIBBTRgpWqq44WeI4/yIS+PJ X-Received: by 10.98.20.203 with SMTP id 194mr35855736pfu.70.1491495673164; Thu, 06 Apr 2017 09:21:13 -0700 (PDT) Received: from vrozov.local (c-71-204-190-196.hsd1.ca.comcast.net. [71.204.190.196]) by smtp.googlemail.com with ESMTPSA id h25sm4752477pfk.119.2017.04.06.09.21.12 for (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Thu, 06 Apr 2017 09:21:12 -0700 (PDT) Subject: Re: open/close ports and active/inactive streams To: dev@apex.apache.org References: <6FCE59E7-FBC7-4320-9BB1-C6B4B30E4430@datatorrent.com> From: Vlad Rozov Organization: DataTorrent Message-ID: <09c6ac1f-bb28-2dc4-59c9-274e87d40a02@datatorrent.com> Date: Thu, 6 Apr 2017 09:21:14 -0700 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:45.0) Gecko/20100101 Thunderbird/45.8.0 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit archived-at: Thu, 06 Apr 2017 16:21:26 -0000 It is not about a use case difference. My proposal and APEXCORE-408 address the same use case - how to re-allocate resources for batch applications or applications where processing happens in stages. The difference between APEXCORE-408 and the proposal is shift in complexity from application logic to the platform. IMO, supporting batch applications using APEXCORE-408 will require more coding on the application side. Thank you, Vlad On 4/5/17 21:57, Thomas Weise wrote: > I think this needs more input on a use case level. The ability to > dynamically alter the DAG internally will also address the resource > allocation for operators: > > https://issues.apache.org/jira/browse/APEXCORE-408 > > It can be used to implement stages of a batch pipeline and is very flexible > in general. Considering the likely implementation complexity for the > proposed feature I would like to understand what benefits it provides to > the user (use cases that cannot be addressed otherwise)? > > Thanks, > Thomas > > > > On Sat, Apr 1, 2017 at 12:23 PM, Vlad Rozov wrote: > >> Correct, a statefull downstream operator can only be undeployed at a >> checkpoint window after it consumes all data emitted by upstream operator >> on the closed port. >> >> It will be necessary to distinguish between closed port and inactive >> stream. After port is closed, stream may still be active and after port is >> open, stream may still be inactive (not yet ready). >> >> The more contributors participate in the discussion and implementation, >> the more solid the feature will be. >> >> Thank you, >> Vlad >> >> Отправлено с iPhone >> >>> On Apr 1, 2017, at 11:03, Pramod Immaneni >> wrote: >>> Generally a good idea. Care should be taken around fault tolerance and >>> idempotency. Close stream would need to stop accepting new data but still >>> can't actually close all the streams and un-deploy operators till >>> committed. Idempotency might require the close stream to take effect at >> the >>> end of the window. What would it then mean for re-opening streams within >> a >>> window? Also, looks like a larger undertaking, as Ram suggested would be >>> good to understand the use cases and I also suggest that multiple folks >>> participate in the implementation effort to ensure that we are able to >>> address all the scenarios and minimize chances of regression in existing >>> behavior. >>> >>> Thanks >>> >>>> On Sat, Apr 1, 2017 at 8:12 AM, Vlad Rozov >> wrote: >>>> All, >>>> >>>> Currently Apex assumes that an operator can emit on any defined output >>>> port and all streams defined by a DAG are active. I'd like to propose an >>>> ability for an operator to open and close output ports. By default all >>>> ports defined by an operator will be open. In the case an operator for >> any >>>> reason decides that it will not emit tuples on the output port, it may >>>> close it. This will make the stream inactive and the application master >> may >>>> undeploy the downstream (for that input stream) operators. If this >> leads to >>>> containers that don't have any active operators, those containers may be >>>> undeployed as well leading to better cluster resource utilization and >>>> better Apex elasticity. Later, the operator may be in a state where it >>>> needs to emit tuples on the closed port. In this case, it needs to >> re-open >>>> the port and wait till the stream becomes active again before emitting >>>> tuples on that port. Making inactive stream active again, requires the >>>> application master to re-allocate containers and re-deploy the >> downstream >>>> operators. >>>> >>>> It should be also possible for an application designer to mark streams >> as >>>> inactive when an application starts. This will allow the application >> master >>>> avoid reserving all containers when the application starts. Later, the >> port >>>> can be open and inactive stream become active. >>>> >>>> Thank you, >>>> >>>> Vlad >>>> >>>>