Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DC029200B9B for ; Wed, 12 Oct 2016 20:07:27 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DA7D6160AD4; Wed, 12 Oct 2016 18:07:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D4CD6160ACA for ; Wed, 12 Oct 2016 20:07:26 +0200 (CEST) Received: (qmail 2423 invoked by uid 500); 12 Oct 2016 18:07:26 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 2407 invoked by uid 99); 12 Oct 2016 18:07:24 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Oct 2016 18:07:24 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 66D42C0829 for ; Wed, 12 Oct 2016 18:07:24 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.479 X-Spam-Level: ** X-Spam-Status: No, score=2.479 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=datatorrent-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id e_-taJeGBDYb for ; Wed, 12 Oct 2016 18:07:20 +0000 (UTC) Received: from mail-oi0-f50.google.com (mail-oi0-f50.google.com [209.85.218.50]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id A976D5FB09 for ; Wed, 12 Oct 2016 18:07:19 +0000 (UTC) Received: by mail-oi0-f50.google.com with SMTP id t73so70203105oie.1 for ; Wed, 12 Oct 2016 11:07:19 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=datatorrent-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=xN7EdfbzPnH9vlYK6VtG3wTwfTer6c2WUGCGKV5yrwI=; b=vjuD9ayc3KeD8thqqOmjv5jUxgnYUq4OfFTee4TckC7zbSlpFiVgFl+P7vEZAjCSkb LIf0cvYOAfAVfVIfndDqk/vMdZKBB0iahHm4h0gPv2rNRJ24LAsbB7mooFk4f0TW7UmJ cK6PIoXjzUzXU73+bdIHakAlZiJU5Lp9/LB5SW1rUV4+DKAonMujGS65QS8QZwMKb9Wt Oky21mkDT7DuSLFv/hhb5LnwePpfMp574qqDJRYe0wmAgdfFGDks83o+2NHLgg9eM3Rx 3kvsatz0GBZHLN/Mqp3/o1DL+KDl33qp2TgyZqN131UAFNXWOiiWlSgrmFl+boWurImI St4g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=xN7EdfbzPnH9vlYK6VtG3wTwfTer6c2WUGCGKV5yrwI=; b=jZnXRYa5QjMl+cBrvdyoJjFzNpoGisIQ851dp2PiVH0Unv89FpCqnK5LAYhf5Oxusb uZXGal4MnN+gPeU34QSvr/vU4jRlUKHqnrfna8HgDDXnybCSqWUOpobvX/+9rhhoTFc0 jo9FBP/PdJdt4rBglnP5EhMSrQKIHupKufxi1sptHbgGoP6inc1FQ+8Gxy8wMjPUPZGu jULXxHfVM0wyLWfEAjZ0s7vP7SZ80v+NcDS9wZtvfaX68HM7ZW3cc3KQwe5agPPDRp6h +G1OZ0gr1ssQm83gQxXEFKVekKbXTDcX1qkMLC89gHNfWl/Kyq4b3XcqcyeAGy76Tyye uyjg== X-Gm-Message-State: AA6/9RnEQq2ws7gPzNMznmJFVszixyLHsCpsYZSJDp9IgEux/gp5KT5leFDY94vGWXGBWcsbRqBDR6OAWXjBYhfQ X-Received: by 10.157.40.142 with SMTP id s14mr1134224ota.68.1476295633846; Wed, 12 Oct 2016 11:07:13 -0700 (PDT) MIME-Version: 1.0 Received: by 10.157.35.138 with HTTP; Wed, 12 Oct 2016 11:07:13 -0700 (PDT) In-Reply-To: <0420E79F-3673-409E-AA67-FB3B67DFC8AB@datatorrent.com> References: <71c00c32-c6b9-ed22-853f-ac47f6f9ae3c@datatorrent.com> <5BADFD13-FEB9-45C8-BF17-06EB35C30D94@datatorrent.com> <74dfdbbf-191e-63b0-39c7-0ce004ca6d56@datatorrent.com> <06D81B67-240D-47B6-BBC3-CEBF61AE9A23@datatorrent.com> <1b807a39-e175-73d4-17c8-c42fa04f6702@datatorrent.com> <641367C2-B275-463A-889A-9C075A1A8916@datatorrent.com> <0420E79F-3673-409E-AA67-FB3B67DFC8AB@datatorrent.com> From: Sanjay Pujare Date: Wed, 12 Oct 2016 11:07:13 -0700 Message-ID: Subject: Re: can operators emit on a different from the operator itself thread? To: dev@apex.apache.org Content-Type: multipart/alternative; boundary=001a113d020ed6dcc8053eaedc6e archived-at: Wed, 12 Oct 2016 18:07:28 -0000 --001a113d020ed6dcc8053eaedc6e Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable A JIRA has been created for adding this thread affinity check https://issues.apache.org/jira/browse/APEXCORE-510 . I have made this enhancement in a branch https://github.com/sanjaypujare/apex-core/tree/malhar-510.thread_affinity and I have been benchmarking the performance with this change. I will be publishing the results in the above JIRA where we can discuss them and hopefully agree on merging this change. On Thu, Aug 11, 2016 at 1:41 PM, Sanjay Pujare wrote: > You are right, I was subconsciously thinking about the THREAD_LOCAL case > with a single container and a simple DAG and in that case Vlad=E2=80=99s = assumption > might not be valid but may be it is. > > On 8/11/16, 11:47 AM, "Munagala Ramanath" wrote: > > If I understand Vlad correctly, what he is saying is that each operat= or > saves currentThread in > its own setup() and checks it in its own output methods. The threads = in > different operators are > running potentially on different nodes and/or processes and there wil= l > be > no connection between them. > > Ram > > On Thu, Aug 11, 2016 at 11:41 AM, Sanjay Pujare < > sanjay@datatorrent.com> > wrote: > > > Name check is expensive, agreed, but there isn=E2=80=99t anything e= lse > currently. > > Ideally the stram engine (considering that it is an engine providin= g > > resources like threads etc) should use a ThreadFactory or a > ThreadGroup to > > create operator threads so identification and adding functionality = is > > easier. > > > > The idea of checking for the same thread between setup() and emit() > won=E2=80=99t > > work because the emit() check will have to be in the Sink hierarchy > and > > AFAIK a Sink object doesn=E2=80=99t have access to the correspondin= g > operator, > > right? Another more fundamental problem probably is that these > threads > > don=E2=80=99t have to match. The emit() for any operator (or rather= a Sink > related > > to an operator) is ultimately triggered by an emitTuple() on the > topmost > > input operator in that path which happens in that input operator=E2= =80=99s > thread > > which doesn=E2=80=99t have to match the thread calling setup() in t= he > downstream > > operators, right? > > > > > > On 8/11/16, 10:59 AM, "Vlad Rozov" wrote: > > > > Name verification is too expensive, it will be sufficient to > store > > currentThread during setup() and verify that it is the same > during > > emit. > > Checks should be supported not only for DefaultOutputPort, so w= e > may > > have it implemented in various Sinks. > > > > Vlad > > > > On 8/11/16 10:21, Sanjay Pujare wrote: > > > Thinking more about this =E2=80=93 all of the =E2=80=9Coperat= or=E2=80=9D threads are > created > > by the Stram engine with appropriate names. So we can put checks in > the > > DefaultOutputPort.emit() or in the various implementations of > Sink.put() > > that the current-thread is one created by the Stram engine (by > verifying > > the name). > > > > > > We can even use a special Thread object for operator threads > so the > > above detection is easier. > > > > > > > > > > > > On 8/10/16, 6:11 PM, "Amol Kekre" > wrote: > > > > > > +1 on debug proposal. Even if tuples lands up within the > > window, it breaks > > > all guarantees. A rerun (after restart from a checkpoint= ) > can > > have tuples > > > in different windows from this thread. A separate thread > simply > > exposes > > > users to unwarranted risks. > > > > > > Thks > > > Amol > > > > > > > > > On Wed, Aug 10, 2016 at 6:05 PM, Vlad Rozov < > > v.rozov@datatorrent.com> wrote: > > > > > > > Tuples emitted between end and begin windows is only > one of > > possible > > > > behaviors that emitting tuples on a separate from the > > operator thread may > > > > introduce. It will be good to have both checks in plac= e > at > > run-time and if > > > > checking for the operator thread for every emitted > tuple is > > too expensive, > > > > we may have it enabled only in DEBUG or mode with more > checks > > in place. > > > > > > > > Vlad > > > > > > > > > > > > Sanjay just reminded me of my typo -> I meant between > > end_window and > > > >> start_window :) > > > >> > > > >> Thks > > > >> Amol > > > >> > > > >> On Wed, Aug 10, 2016 at 2:36 PM, Sanjay Pujare < > > sanjay@datatorrent.com> > > > >> wrote: > > > >> > > > >> If the goal is to do this validation through static > analysis > > of operator > > > >>> code, I guess it is possible but is going to be > > non-trivial. And there > > > >>> could be false positives and false negatives. > > > >>> > > > >>> Also I suppose this discussion applies to processor > > operators (those > > > >>> having both in and out ports) so Ram=E2=80=99s examp= le of > > JdbcPollInputOperator > > > >>> may > > > >>> not be applicable here? > > > >>> > > > >>> On 8/10/16, 2:04 PM, "Ashwin Chandra Putta" < > > ashwinchandrap@gmail.com> > > > >>> wrote: > > > >>> > > > >>> In a separate thread I mean. > > > >>> > > > >>> Regards, > > > >>> Ashwin. > > > >>> > > > >>> On Wed, Aug 10, 2016 at 2:01 PM, Ashwin Chandra > Putta < > > > >>> ashwinchandrap@gmail.com> wrote: > > > >>> > > > >>> > + dev@apex.apache.org > > > >>> > - users@apex.apache.org > > > >>> > > > > >>> > This is one of those best practices that we > learn by > > experience > > > >>> during > > > >>> > operator development. It will save a lot of > time > > during operator > > > >>> > development if we can catch and throw > validation > > error when > > > >>> someone > > > >>> emits > > > >>> > tuples in a non separate thread. > > > >>> > > > > >>> > Regards, > > > >>> > Ashwin > > > >>> > > > > >>> > On Wed, Aug 10, 2016 at 1:57 PM, Munagala > Ramanath < > > > >>> ram@datatorrent.com> > > > >>> > wrote: > > > >>> > > > > >>> >> For cases where use of a different thread is > > needed, it can write > > > >>> tuples > > > >>> >> to a queue from where the operator thread > pulls > > them -- > > > >>> >> JdbcPollInputOperator in Malhar has an > example. > > > >>> >> > > > >>> >> Ram > > > >>> >> > > > >>> >> On Wed, Aug 10, 2016 at 1:50 PM, > hsy541@gmail.com < > > > >>> hsy541@gmail.com > > > >>> >> wrote: > > > >>> >> > > > >>> >>> Hey Vlad, > > > >>> >>> > > > >>> >>> Thanks for bringing this up. Is there an > easy way > > to detect > > > >>> unexpected > > > >>> >>> use of emit method without hurt the > performance. > > Or at least if > > > >>> we > > > >>> can > > > >>> >>> detect this in debug mode. > > > >>> >>> > > > >>> >>> Regards, > > > >>> >>> Siyuan > > > >>> >>> > > > >>> >>> On Wed, Aug 10, 2016 at 11:27 AM, Vlad Rozo= v > < > > > >>> v.rozov@datatorrent.com> > > > >>> >>> wrote: > > > >>> >>> > > > >>> >>>> The short answer is no, creating worker > thread to > > emit tuples > > > >>> is > > > >>> not > > > >>> >>>> supported by Apex and will lead to an > undefined > > behavior. > > > >>> Operators in Apex > > > >>> >>>> have strong thread affinity and all > interaction > > with the > > > >>> platform > > > >>> must > > > >>> >>>> happen on the operator thread. > > > >>> >>>> > > > >>> >>>> Vlad > > > >>> >>>> > > > >>> >>> > > > >>> >>> > > > >>> >> > > > >>> > > > > >>> > > > > >>> > -- > > > >>> > > > > >>> > Regards, > > > >>> > Ashwin. > > > >>> > > > > >>> > > > >>> > > > >>> > > > >>> -- > > > >>> > > > >>> Regards, > > > >>> Ashwin. > > > >>> > > > >>> > > > >>> > > > >>> > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > --001a113d020ed6dcc8053eaedc6e--