Return-Path: X-Original-To: apmail-apex-dev-archive@minotaur.apache.org Delivered-To: apmail-apex-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 403F218B59 for ; Fri, 12 Feb 2016 01:47:22 +0000 (UTC) Received: (qmail 70173 invoked by uid 500); 12 Feb 2016 01:47:22 -0000 Delivered-To: apmail-apex-dev-archive@apex.apache.org Received: (qmail 70109 invoked by uid 500); 12 Feb 2016 01:47:22 -0000 Mailing-List: contact dev-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list dev@apex.incubator.apache.org Received: (qmail 70097 invoked by uid 99); 12 Feb 2016 01:47:21 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Feb 2016 01:47:21 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 506C3C0C99 for ; Fri, 12 Feb 2016 01:47:21 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.279 X-Spam-Level: * X-Spam-Status: No, score=1.279 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=datatorrent-com.20150623.gappssmtp.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id t-w9Aet7BYBW for ; Fri, 12 Feb 2016 01:47:16 +0000 (UTC) Received: from mail-qk0-f180.google.com (mail-qk0-f180.google.com [209.85.220.180]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 93D2E20428 for ; Fri, 12 Feb 2016 01:47:16 +0000 (UTC) Received: by mail-qk0-f180.google.com with SMTP id s5so26523704qkd.0 for ; Thu, 11 Feb 2016 17:47:16 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=datatorrent-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=nqmiYkKR9V6UGMbab7aWzEk8n1x+sweZ6LEqU1bHGrQ=; b=D6dRZzFIFerM0sZVZf1copli0V+Ez/ZglKW/85vHDQhQpbVPi8YQe2nnLUsC/nd5Ss QgXyKjq2qu/uGWaAtl114qUY3yRJGAG75FGdBUNTqnxLodttM8Obr+9x95n7qTXi1w9K CH30Erhdxxb5q8+4t7SdXGACVHUgx23Vq61SbxEF6NOdVC0qXmZTBWIBjxwRROL66vbC g2b61SqzaSi+AGYMGnfthHBjCn0yNdxyi1XoiC5lLRKvRUy3MR/4+E8rK6wBn54fjvZl LlNol6lNJeESw5dMcx4ZB8oJ89lfNNyFGdUlu66Gqb3xEJxXao84GOq2dOo5eEt4HzbK x/7g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:content-type; bh=nqmiYkKR9V6UGMbab7aWzEk8n1x+sweZ6LEqU1bHGrQ=; b=FYrG2z1bvuqm7kRl7C/cvTzo99JFRZ52QPb0RDpSmv6DFS/RESDo0HI3mkf0x8LLHH WtuwDlAeBWI727zrC3PwAZ6cq9r5xEq5WbzIBNBTtSaRPt0DAqWaew7HDVxecg7GO1B6 8Bpd57K9Ba6Yeu2FWedpBQVi0I32V8f9k3ufOOA/Gti/rTEccRvKwOsFyB3nmPuLx/pz UAST89UVG+saKoG1l8clQDqOi2KHdyoOtOWXqSice3ULfxQAVFUfYnvAYfs2EmjrXaGo /8bD77tm9kUby3NoMocWzjV51TgtSGkPA1zHeAcwGq3vLpV2vUdo/RHFAHhtemwZa3ZN CV8g== X-Gm-Message-State: AG10YOTURQ4HmnpjZejsnF1hTc4cbe9K8zqe19k98mW3CYwflVaffz9RJ6a7GXIr7sSa53UbOXceMGn+r7A7O1+6 X-Received: by 10.55.221.129 with SMTP id u1mr40275542qku.42.1455241630484; Thu, 11 Feb 2016 17:47:10 -0800 (PST) MIME-Version: 1.0 Received: by 10.140.34.43 with HTTP; Thu, 11 Feb 2016 17:46:41 -0800 (PST) In-Reply-To: References: From: Pramod Immaneni Date: Thu, 11 Feb 2016 17:46:41 -0800 Message-ID: Subject: Re: load based stream partitioning To: dev@apex.incubator.apache.org Content-Type: multipart/alternative; boundary=001a1149aa8072a85a052b88d844 --001a1149aa8072a85a052b88d844 Content-Type: text/plain; charset=UTF-8 Inline On Thu, Feb 11, 2016 at 4:32 PM, Timothy Farkas wrote: > Comments inline > > +1 Overall as well provided Apex-339 is implemented first and it is > documented that the mechanism should not be used with some stateful > operators. > > On Thu, Feb 11, 2016 at 4:20 PM, Pramod Immaneni > wrote: > > > Comments inline > > > > On Thu, Feb 11, 2016 at 4:13 PM, Timothy Farkas > > wrote: > > > > > Hey Pramod, > > > > > > I agree if APEX-339 is in place then it would work without redeploying > > > containers for operators that are Stateless, or a subset of Stateful > > > operators. > > > > > > Addressing your previous questions. > > > > > > - The StatsListener can be used to see how far behind operators are. > You > > > could determine what window the operator is on, or the number of tuples > > > it's processed so far, or how long > > > it takes it to complete a window. > > > > > > > What if tuples are different sizes and number of tuples processed doesn't > > reflect how far ahead or behind a downstream partition is? How is the > > information from StatsListener made available to the upstream partition > > codecs. > > > What is the information Buffer Server can provide that the StatsListener > cannot? > The stats information would have to be relayed down to the upstream operators. It's possible. > > The StatsListener can trigger a repartition. The information in the > StatsListener can be shared > with the partitioner by setting the same object for both in populate Dag. > The partitioner can then > compute the new Stream Codec. The mechanism by which the upstream would be > updated with the new > Stream Codec would have to be implemented as it's currently not there. > > > > > > > > > > > - Some examples of Stateful operators that require repartitioning of > > state > > > are the following: > > > - Deduper > > > In this case after updating the stream codec the operator > may > > > allow a previously seen value to pass because the partition didn't > > receive > > > that value with the previous stream codec. > > > - A key value store that holds aggregations for each key. > > > In this case multiple partitions would hold partial > > aggregations > > > for a key, when they are expecting to hold the complete aggregation. > > > > > > > Agreed for deduper. For the second case a unifier is a better approach so > > that you are not affected by key skew in general. > > > This is not always possible. We can discuss this offline, since it won't > add much to the discussion here to go into the details. > > Yes not always. > > > > > > > > > > Tim > > > > > > On Thu, Feb 11, 2016 at 4:04 PM, Pramod Immaneni < > pramod@datatorrent.com > > > > > > wrote: > > > > > > > Additionally it can be treated as a non-idempotent stream for > recovery. > > > > Look at APEXCORE-339. In cases where the downstream partitions > require > > > some > > > > key based partitioning, what you are suggesting would be a good > > approach > > > > but it will require more complex logic in the StreamCodec to both key > > and > > > > load based partitioning. > > > > > > > > On Thu, Feb 11, 2016 at 3:49 PM, Pramod Immaneni < > > pramod@datatorrent.com > > > > > > > > wrote: > > > > > > > > > How would you know how far behind partitions are without > interacting > > > with > > > > > BufferServer like you were mentioning in the earlier email. > Secondly > > > why > > > > > would changing where the data is sent to based mandate > > re-partitioning > > > if > > > > > the downstream partitions can handle data with different keys. > > > > > > > > > > On Thu, Feb 11, 2016 at 3:43 PM, Timothy Farkas < > tim@datatorrent.com > > > > > > > > wrote: > > > > > > > > > >> Hey Pramod, > > > > >> > > > > >> I think in general and for recovery the existing Partitioning > > > machinery > > > > >> can > > > > >> be reused to update the Stream Codec. > > > > >> The reason why is because If the operator is Stateful and changes > > are > > > > made > > > > >> to the Stream Codec, the state of the partitions will also have to > > be > > > > >> repartitioned. > > > > >> In this case the number of partitions will remain the same, just > the > > > > state > > > > >> of the partitions is reshuffled. The implementation for this state > > > > >> reshuffling in a fault tolerant way is already handled by the > > Dynamic > > > > >> Partitioning logic, so it could be extended to update the Stream > > Codec > > > > as > > > > >> well. > > > > >> > > > > >> If the operator is Stateless, it may be possible to do without > > > > redeploying > > > > >> any containers. But with the way I am envisioning it, I think > there > > > > would > > > > >> be a lot of difficult to handle corner cases for recovery. > > > > >> > > > > >> Tim > > > > >> > > > > >> On Thu, Feb 11, 2016 at 3:07 PM, Pramod Immaneni < > > > > pramod@datatorrent.com> > > > > >> wrote: > > > > >> > > > > >> > Comment inline. > > > > >> > > > > > >> > On Thu, Feb 11, 2016 at 12:21 PM, Timothy Farkas < > > > tim@datatorrent.com > > > > > > > > > >> > wrote: > > > > >> > > > > > >> > > +1 for the idea. > > > > >> > > > > > > >> > > Gaurav, this could be done idempotently in the same way that > > > dynamic > > > > >> > > repartitioning is done idempotently. All the partitions are > > rolled > > > > >> back > > > > >> > to > > > > >> > > a common checkpoint and the new StreamCodec is applied > starting > > > > then. > > > > >> The > > > > >> > > statistics that the Stream Codec are given are the statistics > > for > > > > the > > > > >> > > windows computed before the common checkpoint that the > > partitions > > > > are > > > > >> > > rolled back to. > > > > >> > > > > > > >> > > In fact I think this feature could be added easily by avoiding > > > > buffer > > > > >> > > server entirely and by allowing the Partitioner to redefine > the > > > > >> > StreamCodec > > > > >> > > for the operator when define partitions is called. > > > > >> > > > > > > >> > > > > > >> > Are you saying this in context of recovery or in general? > > > > >> > > > > > >> > > > > > >> > > > > > > >> > > Thanks, > > > > >> > > Tim > > > > >> > > > > > > >> > > On Thu, Feb 11, 2016 at 12:07 PM, Amol Kekre < > > > amol@datatorrent.com> > > > > >> > wrote: > > > > >> > > > > > > >> > > > Gaurav, > > > > >> > > > It would not be idempotent per partition, but will be across > > all > > > > >> > > partitions > > > > >> > > > combined. In this case the user would have explicitly asked > > for > > > > >> such a > > > > >> > > > pattern. > > > > >> > > > > > > > >> > > > Thks, > > > > >> > > > Amol > > > > >> > > > > > > > >> > > > > > > > >> > > > On Thu, Feb 11, 2016 at 12:04 PM, Gaurav Gupta < > > > > >> > gaurav.gopi123@gmail.com > > > > >> > > > > > > > >> > > > wrote: > > > > >> > > > > > > > >> > > > > Pramod, > > > > >> > > > > > > > > >> > > > > How would it work with recovery? There could be cases > where > > a > > > > >> tuple > > > > >> > > went > > > > >> > > > to > > > > >> > > > > P1 and post recovery it can go to P2 > > > > >> > > > > > > > > >> > > > > Gaurav > > > > >> > > > > > > > > >> > > > > On Thu, Feb 11, 2016 at 11:56 AM, Pramod Immaneni < > > > > >> > > > pramod@datatorrent.com> > > > > >> > > > > wrote: > > > > >> > > > > > > > > >> > > > > > Hi, > > > > >> > > > > > > > > > >> > > > > > There are scenarios where the downstream partitions of > an > > > > >> upstream > > > > >> > > > > operator > > > > >> > > > > > are generally not performing uniformly resulting in an > > > overall > > > > >> > > > > sub-optimal > > > > >> > > > > > performance dictated by the slowest partitions. The > > reasons > > > > >> could > > > > >> > be > > > > >> > > > data > > > > >> > > > > > related such as some partitions are receiving more data > to > > > > >> process > > > > >> > > than > > > > >> > > > > the > > > > >> > > > > > others or could be environment related such as some > > > partitions > > > > >> are > > > > >> > > > > running > > > > >> > > > > > slower than others because they are on heavily loaded > > nodes. > > > > >> > > > > > > > > > >> > > > > > A solution based on currently available functionality in > > the > > > > >> engine > > > > >> > > > would > > > > >> > > > > > be to write a StreamCodec implementation to distribute > > data > > > > >> among > > > > >> > the > > > > >> > > > > > partitions such that each partition is receiving similar > > > > amount > > > > >> of > > > > >> > > data > > > > >> > > > > to > > > > >> > > > > > process. We should consider adding StreamCodecs like > these > > > to > > > > >> the > > > > >> > > > library > > > > >> > > > > > but these however do not solve the problem when it is > > > > >> environment > > > > >> > > > > related. > > > > >> > > > > > > > > > >> > > > > > For that a better and more comprehensive approach would > be > > > > look > > > > >> at > > > > >> > > how > > > > >> > > > > data > > > > >> > > > > > is being consumed by the downstream partitions from the > > > > >> > BufferServer > > > > >> > > > and > > > > >> > > > > > use that information to make decisions on how to send > > future > > > > >> data. > > > > >> > If > > > > >> > > > > some > > > > >> > > > > > partitions are behind others in consuming data then data > > can > > > > be > > > > >> > > > directed > > > > >> > > > > to > > > > >> > > > > > the other partitions. One way to do this would be to > relay > > > > this > > > > >> > type > > > > >> > > of > > > > >> > > > > > statistical and positional information from BufferServer > > to > > > > the > > > > >> > > > upstream > > > > >> > > > > > publishers. The publishers can use this information in > > ways > > > > >> such as > > > > >> > > > > making > > > > >> > > > > > it available to StreamCodecs to affect destination of > > future > > > > >> data. > > > > >> > > > > > > > > > >> > > > > > What do you think. > > > > >> > > > > > > > > > >> > > > > > Thanks > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > > --001a1149aa8072a85a052b88d844--