apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vlad Rozov <v.ro...@datatorrent.com>
Subject Re: can operators emit on a different from the operator itself thread?
Date Thu, 13 Oct 2016 02:02:01 GMT
I run jmh test and check takes 1ns on my MacBook Pro and on the lab 
machine. This corresponds to 3% degradation at 30 million events/second. 
I think we can move forward with the check ON by default. Do we need an 
ability to turn OFF check for a specific operator and/or port? My 
thought is that such ability is not necessary and it should be OK to 
disable check for all output ports in an application.

Vlad

On 10/12/16 11:56, Amol Kekre wrote:
> In case there turns out to be a penalty, we can introduce a "check for
> thread affinity" mode that triggers this check. My initial thought is to
> make this check ON by default. We should wait till benchmarks are available
> before discussing adding this check.
>
> Thks
> Amol
>
>
> On Wed, Oct 12, 2016 at 11:07 AM, Sanjay Pujare <sanjay@datatorrent.com>
> wrote:
>
>> A JIRA has been created for adding this thread affinity check
>> https://issues.apache.org/jira/browse/APEXCORE-510 . I have made this
>> enhancement in a branch
>> https://github.com/sanjaypujare/apex-core/tree/malhar-510.thread_affinity
>> and I have been benchmarking the performance with this change. I will be
>> publishing the results in the above JIRA where we can discuss them and
>> hopefully agree on merging this change.
>>
>> On Thu, Aug 11, 2016 at 1:41 PM, Sanjay Pujare <sanjay@datatorrent.com>
>> wrote:
>>
>>> You are right, I was subconsciously thinking about the THREAD_LOCAL case
>>> with a single container and a simple DAG and in that case Vlad’s
>> assumption
>>> might not be valid but may be it is.
>>>
>>> On 8/11/16, 11:47 AM, "Munagala Ramanath" <ram@datatorrent.com> wrote:
>>>
>>>      If I understand Vlad correctly, what he is saying is that each
>> operator
>>>      saves currentThread in
>>>      its own setup() and checks it in its own output methods. The threads
>> in
>>>      different operators are
>>>      running potentially on different nodes and/or processes and there
>> will
>>> be
>>>      no connection between them.
>>>
>>>      Ram
>>>
>>>      On Thu, Aug 11, 2016 at 11:41 AM, Sanjay Pujare <
>>> sanjay@datatorrent.com>
>>>      wrote:
>>>
>>>      > Name check is expensive, agreed, but there isn’t anything else
>>> currently.
>>>      > Ideally the stram engine (considering that it is an engine
>> providing
>>>      > resources like threads etc) should use a ThreadFactory or a
>>> ThreadGroup to
>>>      > create operator threads so identification and adding functionality
>> is
>>>      > easier.
>>>      >
>>>      > The idea of checking for the same thread between setup() and emit()
>>> won’t
>>>      > work because the emit() check will have to be in the Sink hierarchy
>>> and
>>>      > AFAIK a Sink object doesn’t have access to the corresponding
>>> operator,
>>>      > right? Another more fundamental problem probably is that these
>>> threads
>>>      > don’t have to match. The emit() for any operator (or rather a Sink
>>> related
>>>      > to an operator) is ultimately triggered by an emitTuple() on the
>>> topmost
>>>      > input operator in that path which happens in that input operator’s
>>> thread
>>>      > which doesn’t have to match the thread calling setup() in the
>>> downstream
>>>      > operators, right?
>>>      >
>>>      >
>>>      > On 8/11/16, 10:59 AM, "Vlad Rozov" <v.rozov@datatorrent.com>
>> wrote:
>>>      >
>>>      >     Name verification is too expensive, it will be sufficient to
>>> store
>>>      >     currentThread during setup() and verify that it is the same
>>> during
>>>      > emit.
>>>      >     Checks should be supported not only for DefaultOutputPort, so
>> we
>>> may
>>>      >     have it implemented in various Sinks.
>>>      >
>>>      >     Vlad
>>>      >
>>>      >     On 8/11/16 10:21, Sanjay Pujare wrote:
>>>      >     > Thinking more about this – all of the “operator” threads
are
>>> created
>>>      > by the Stram engine with appropriate names. So we can put checks in
>>> the
>>>      > DefaultOutputPort.emit() or in the various implementations of
>>> Sink.put()
>>>      > that the current-thread is one created by the Stram engine (by
>>> verifying
>>>      > the name).
>>>      >     >
>>>      >     > We can even use a special Thread object for operator threads
>>> so the
>>>      > above detection is easier.
>>>      >     >
>>>      >     >
>>>      >     >
>>>      >     > On 8/10/16, 6:11 PM, "Amol Kekre" <amol@datatorrent.com>
>>> wrote:
>>>      >     >
>>>      >     >      +1 on debug proposal. Even if tuples lands up within
the
>>>      > window, it breaks
>>>      >     >      all guarantees. A rerun (after restart from a
>> checkpoint)
>>> can
>>>      > have tuples
>>>      >     >      in different windows from this thread. A separate thread
>>> simply
>>>      > exposes
>>>      >     >      users to unwarranted risks.
>>>      >     >
>>>      >     >      Thks
>>>      >     >      Amol
>>>      >     >
>>>      >     >
>>>      >     >      On Wed, Aug 10, 2016 at 6:05 PM, Vlad Rozov <
>>>      > v.rozov@datatorrent.com> wrote:
>>>      >     >
>>>      >     >      > Tuples emitted between end and begin windows is
only
>>> one of
>>>      > possible
>>>      >     >      > behaviors that emitting tuples on a separate from
the
>>>      > operator thread may
>>>      >     >      > introduce. It will be good to have both checks in
>> place
>>> at
>>>      > run-time and if
>>>      >     >      > checking for the operator thread for every emitted
>>> tuple is
>>>      > too expensive,
>>>      >     >      > we may have it enabled only in DEBUG or mode with
more
>>> checks
>>>      > in place.
>>>      >     >      >
>>>      >     >      > Vlad
>>>      >     >      >
>>>      >     >      >
>>>      >     >      > Sanjay just reminded me of my typo -> I meant
between
>>>      > end_window and
>>>      >     >      >> start_window :)
>>>      >     >      >>
>>>      >     >      >> Thks
>>>      >     >      >> Amol
>>>      >     >      >>
>>>      >     >      >> On Wed, Aug 10, 2016 at 2:36 PM, Sanjay Pujare
<
>>>      > sanjay@datatorrent.com>
>>>      >     >      >> wrote:
>>>      >     >      >>
>>>      >     >      >> If the goal is to do this validation through
static
>>> analysis
>>>      > of operator
>>>      >     >      >>> code, I guess it is possible but is going
to be
>>>      > non-trivial. And there
>>>      >     >      >>> could be false positives and false negatives.
>>>      >     >      >>>
>>>      >     >      >>> Also I suppose this discussion applies to
processor
>>>      > operators (those
>>>      >     >      >>> having both in and out ports) so Ram’s
example of
>>>      > JdbcPollInputOperator
>>>      >     >      >>> may
>>>      >     >      >>> not be applicable here?
>>>      >     >      >>>
>>>      >     >      >>> On 8/10/16, 2:04 PM, "Ashwin Chandra Putta"
<
>>>      > ashwinchandrap@gmail.com>
>>>      >     >      >>> wrote:
>>>      >     >      >>>
>>>      >     >      >>>      In a separate thread I mean.
>>>      >     >      >>>
>>>      >     >      >>>      Regards,
>>>      >     >      >>>      Ashwin.
>>>      >     >      >>>
>>>      >     >      >>>      On Wed, Aug 10, 2016 at 2:01 PM, Ashwin
Chandra
>>> Putta <
>>>      >     >      >>>      ashwinchandrap@gmail.com> wrote:
>>>      >     >      >>>
>>>      >     >      >>>      > + dev@apex.apache.org
>>>      >     >      >>>      > - users@apex.apache.org
>>>      >     >      >>>      >
>>>      >     >      >>>      > This is one of those best practices
that we
>>> learn by
>>>      > experience
>>>      >     >      >>> during
>>>      >     >      >>>      > operator development. It will
save a lot of
>>> time
>>>      > during operator
>>>      >     >      >>>      > development if we can catch and
throw
>>> validation
>>>      > error when
>>>      >     >      >>> someone
>>>      >     >      >>> emits
>>>      >     >      >>>      > tuples in a non separate thread.
>>>      >     >      >>>      >
>>>      >     >      >>>      > Regards,
>>>      >     >      >>>      > Ashwin
>>>      >     >      >>>      >
>>>      >     >      >>>      > On Wed, Aug 10, 2016 at 1:57 PM,
Munagala
>>> Ramanath <
>>>      >     >      >>> ram@datatorrent.com>
>>>      >     >      >>>      > wrote:
>>>      >     >      >>>      >
>>>      >     >      >>>      >> For cases where use of a different
thread is
>>>      > needed, it can write
>>>      >     >      >>> tuples
>>>      >     >      >>>      >> to a queue from where the
operator thread
>>> pulls
>>>      > them --
>>>      >     >      >>>      >> JdbcPollInputOperator in Malhar
has an
>>> example.
>>>      >     >      >>>      >>
>>>      >     >      >>>      >> Ram
>>>      >     >      >>>      >>
>>>      >     >      >>>      >> On Wed, Aug 10, 2016 at 1:50
PM,
>>> hsy541@gmail.com <
>>>      >     >      >>> hsy541@gmail.com
>>>      >     >      >>>      >> wrote:
>>>      >     >      >>>      >>
>>>      >     >      >>>      >>> Hey Vlad,
>>>      >     >      >>>      >>>
>>>      >     >      >>>      >>> Thanks for bringing this
up. Is there an
>>> easy way
>>>      > to detect
>>>      >     >      >>> unexpected
>>>      >     >      >>>      >>> use of emit method without
hurt the
>>> performance.
>>>      > Or at least if
>>>      >     >      >>> we
>>>      >     >      >>> can
>>>      >     >      >>>      >>> detect this in debug mode.
>>>      >     >      >>>      >>>
>>>      >     >      >>>      >>> Regards,
>>>      >     >      >>>      >>> Siyuan
>>>      >     >      >>>      >>>
>>>      >     >      >>>      >>> On Wed, Aug 10, 2016 at
11:27 AM, Vlad
>> Rozov
>>> <
>>>      >     >      >>> v.rozov@datatorrent.com>
>>>      >     >      >>>      >>> wrote:
>>>      >     >      >>>      >>>
>>>      >     >      >>>      >>>> The short answer is
no, creating worker
>>> thread to
>>>      > emit tuples
>>>      >     >      >>> is
>>>      >     >      >>> not
>>>      >     >      >>>      >>>> supported by Apex
and will lead to an
>>> undefined
>>>      > behavior.
>>>      >     >      >>> Operators in Apex
>>>      >     >      >>>      >>>> have strong thread
affinity and all
>>> interaction
>>>      > with the
>>>      >     >      >>> platform
>>>      >     >      >>> must
>>>      >     >      >>>      >>>> happen on the operator
thread.
>>>      >     >      >>>      >>>>
>>>      >     >      >>>      >>>> Vlad
>>>      >     >      >>>      >>>>
>>>      >     >      >>>      >>>
>>>      >     >      >>>      >>>
>>>      >     >      >>>      >>
>>>      >     >      >>>      >
>>>      >     >      >>>      >
>>>      >     >      >>>      > --
>>>      >     >      >>>      >
>>>      >     >      >>>      > Regards,
>>>      >     >      >>>      > Ashwin.
>>>      >     >      >>>      >
>>>      >     >      >>>
>>>      >     >      >>>
>>>      >     >      >>>
>>>      >     >      >>>      --
>>>      >     >      >>>
>>>      >     >      >>>      Regards,
>>>      >     >      >>>      Ashwin.
>>>      >     >      >>>
>>>      >     >      >>>
>>>      >     >      >>>
>>>      >     >      >>>
>>>      >     >      >>>
>>>      >     >      >
>>>      >     >
>>>      >     >
>>>      >     >
>>>      >
>>>      >
>>>      >
>>>      >
>>>      >
>>>
>>>
>>>


Mime
View raw message