apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chetan Narsude <che...@datatorrent.com>
Subject Re: Thread and Container locality
Date Mon, 28 Sep 2015 19:58:33 GMT
Let me shed some light on THREAD_LOCAL and CONTAINER_LOCAL.

THREAD_LOCAL at the core is nothing but a function call. When an operator
does emit(tuple), it gets translated in  downstream ports "process(tuple)"
call which immediately gets invoked in the same thread. So obviously the
performance is going to be a lot faster. The only thing that's happening in
between is setting up the stack and invoking the function.

With CONTAINER_LOCAL - there is a producer thread and  a consumer thread
involved. Producer produces (emit(tuple)) and consumer
consumes(process(tuple)). This scheme is the most optimal when the rate at
which producer produces is equal to the rate at which consumer consumes.
Often that's not the case - so we have a bounded memory buffer in between
(the implementation is CircularBuffer). Now in addition to the things that
THREAD_LOCAL does, CONTAINER_LOCAL pattern requires managing the circular
buffer *and* thread context switch. The most expensive of the thread
context switch is the memory synchronization. As you all have pointed out
how expensive it is to use volatile, I need not get into details of how
expensive memory synchronization can get.

Long story short - no matter which pattern you use, when you use more than
1 thread there are certain memory synchronization penalties which are
unavoidable and slow the things down considerably. In 2012, I had
benchmarked atomic, volatile, synchronized and for the benchmark (I think
there are unit tests for it), I found volatile to be least expensive at
that time. Synchronized was not too much behind (it's very efficient when
the contention is likely to be amongst a single digit number of threads).
Not sure how those benchmark will look today but you get the idea.

In a data intensive app, most of the time is spent in IO and there is a lot
of CPU idling at individual operator so you will not see the difference
when you change CONTAINER_LOCAL to THREAD_LOCAL yet you will see some
memory optimization as you are taking away intermediate memory based buffer
*and* delayed garbage collection of the objects held by this buffer.

Recommendation: Do not bother with these micro optimizations unless you
notice a problem. Use THREAD_LOCAL for processing low-throughput/infrequent
streams. Use CONTAINER_LOCAL to avoid serialization/deserialization of
objects. Leave the rest to the platform. I expect that as it matures it
will make most of these decisions automatically.

HTH.

--
Chetan

On Mon, Sep 28, 2015 at 11:44 AM, Vlad Rozov <v.rozov@datatorrent.com>
wrote:

> Hi Tim,
>
> I use benchmark application that is part of Apache Malhar project. Please
> let me know if you need help with compiling or running the application.
>
> Thank you,
>
> Vlad
>
>
> On 9/28/15 11:09, Timothy Farkas wrote:
>
>> Also sharing a diff
>>
>>
>> https://github.com/DataTorrent/Netlet/compare/master...ilooner:condVarBuffer
>>
>> Thanks,
>> Tim
>>
>> On Mon, Sep 28, 2015 at 10:07 AM, Timothy Farkas <tim@datatorrent.com>
>> wrote:
>>
>> Hi Vlad,
>>>
>>> Could you share your benchmarking applications? I'd like to test a change
>>> I made to the Circular Buffer
>>>
>>>
>>>
>>> https://github.com/ilooner/Netlet/blob/condVarBuffer/src/main/java/com/datatorrent/netlet/util/CircularBuffer.java
>>>
>>> Thanks,
>>> Tim
>>>
>>> On Mon, Sep 28, 2015 at 9:56 AM, Pramod Immaneni <pramod@datatorrent.com
>>> >
>>> wrote:
>>>
>>> Vlad what was your mode of interaction/ordering between the two threads
>>>> for
>>>> the 3rd test.
>>>>
>>>> On Mon, Sep 28, 2015 at 10:51 AM, Vlad Rozov <v.rozov@datatorrent.com>
>>>> wrote:
>>>>
>>>> I created a simple test to check how quickly java can count to
>>>>> Integer.MAX_INTEGER. The result that I see is consistent with
>>>>> CONTAINER_LOCAL behavior:
>>>>>
>>>>> counting long in a single thread: 0.9 sec
>>>>> counting volatile long in a single thread: 17.7 sec
>>>>> counting volatile long shared between two threads: 186.3 sec
>>>>>
>>>>> I suggest that we look into
>>>>>
>>>>>
>>>> https://qconsf.com/sf2012/dl/qcon-sanfran-2012/slides/MartinThompson_LockFreeAlgorithmsForUltimatePerformanceMOVEDTOBALLROOMA.pdf
>>>>
>>>>> or similar algorithm.
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Vlad
>>>>>
>>>>>
>>>>>
>>>>> On 9/28/15 08:19, Vlad Rozov wrote:
>>>>>
>>>>> Ram,
>>>>>>
>>>>>> The stream between operators in case of CONTAINER_LOCAL is
>>>>>>
>>>>> InlineStream.
>>>>
>>>>> InlineStream extends DefaultReservoir that extends CircularBuffer.
>>>>>> CircularBuffer does not use synchronized methods or locks, it uses
>>>>>> volatile. I guess that using volatile causes CPU cache invalidation
>>>>>> and
>>>>>> along with memory locality (in thread local case tuple is always
local
>>>>>>
>>>>> to
>>>>
>>>>> both threads, while in container local case the second operator thread
>>>>>>
>>>>> may
>>>>
>>>>> see data significantly later after the first thread produced it) these
>>>>>>
>>>>> two
>>>>
>>>>> factors negatively impact CONTAINER_LOCAL performance. It is still
>>>>>>
>>>>> quite
>>>>
>>>>> surprising that the impact is so significant.
>>>>>>
>>>>>> Thank you,
>>>>>>
>>>>>> Vlad
>>>>>>
>>>>>> On 9/27/15 16:45, Munagala Ramanath wrote:
>>>>>>
>>>>>> Vlad,
>>>>>>>
>>>>>>> That's a fascinating and counter-intuitive result. I wonder if
some
>>>>>>> internal synchronization is happening
>>>>>>> (maybe the stream between them is a shared data structure that
is
>>>>>>> lock
>>>>>>> protected) to
>>>>>>> slow down the 2 threads in the CONTAINER_LOCAL case. If they
are both
>>>>>>> going as fast as possible
>>>>>>> it is likely that they will be frequently blocked by the lock.
If
>>>>>>> that
>>>>>>> is indeed the case, some sort of lock
>>>>>>> striping or a near-lockless protocol for stream access should
tilt
>>>>>>> the
>>>>>>> balance in favor of CONTAINER_LOCAL.
>>>>>>>
>>>>>>> In the thread-local case of course there is no need for such
locking.
>>>>>>>
>>>>>>> Ram
>>>>>>>
>>>>>>> On Sun, Sep 27, 2015 at 12:17 PM, Vlad Rozov <
>>>>>>> v.rozov@datatorrent.com
>>>>>>> <mailto:v.rozov@datatorrent.com>> wrote:
>>>>>>>
>>>>>>>      Changed subject to reflect shift of discussion.
>>>>>>>
>>>>>>>      After I recompiled netlet and hardcoded 0 wait time in the
>>>>>>>      CircularBuffer.put() method, I still see the same difference
>>>>>>> even
>>>>>>>      when I increased operator memory to 10 GB and set "-D
>>>>>>>      dt.application.*.operator.*.attr.SPIN_MILLIS=0 -D
>>>>>>>      dt.application.*.operator.*.attr.QUEUE_CAPACITY=1024000".
CPU %
>>>>>>>      is close to 100% both for thread and container local locality
>>>>>>>      settings. Note that in thread local two operators share
100%
>>>>>>> CPU,
>>>>>>>      while in container local each gets its own 100% load. It
sounds
>>>>>>>      that container local will outperform thread local only when
>>>>>>>      number of emitted tuples is (relatively) low, for example
when
>>>>>>> it
>>>>>>>      is CPU costly to produce tuples (hash computations,
>>>>>>>      compression/decompression, aggregations, filtering with
complex
>>>>>>>      expressions). In cases where operator may emit 5 or more
million
>>>>>>>      tuples per second, thread local may outperform container
local
>>>>>>>      even when both operators are CPU intensive.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>      Thank you,
>>>>>>>
>>>>>>>      Vlad
>>>>>>>
>>>>>>>      On 9/26/15 22:52, Timothy Farkas wrote:
>>>>>>>
>>>>>>>      Hi Vlad,
>>>>>>>>
>>>>>>>>      I just took a look at the CircularBuffer. Why are threads
>>>>>>>> polling
>>>>>>>> the state
>>>>>>>>      of the buffer before doing operations? Couldn't polling
be
>>>>>>>>
>>>>>>> avoided
>>>>
>>>>> entirely
>>>>>>>>      by using something like Condition variables to signal
when the
>>>>>>>> buffer is
>>>>>>>>      ready for an operation to be performed?
>>>>>>>>
>>>>>>>>      Tim
>>>>>>>>
>>>>>>>>      On Sat, Sep 26, 2015 at 10:42 PM, Vlad Rozov<
>>>>>>>> v.rozov@datatorrent.com> <mailto:v.rozov@datatorrent.com>
>>>>>>>>      wrote:
>>>>>>>>
>>>>>>>>      After looking at few stack traces I think that in the
benchmark
>>>>>>>>
>>>>>>>>>      application operators compete for the circular buffer
that
>>>>>>>>>
>>>>>>>> passes
>>>>
>>>>> slices
>>>>>>>>>      from the emitter output to the consumer input and
sleeps that
>>>>>>>>> avoid busy
>>>>>>>>>      wait are too long for the benchmark operators. I
don't see the
>>>>>>>>> stack
>>>>>>>>>      similar to the one below all the time I take the
threads dump,
>>>>>>>>>
>>>>>>>> but
>>>>
>>>>> still
>>>>>>>>>      quite often to suspect that sleep is the root cause.
I'll
>>>>>>>>> recompile with
>>>>>>>>>      smaller sleep time and see how this will affect
performance.
>>>>>>>>>
>>>>>>>>>      ----
>>>>>>>>>      "1/wordGenerator:RandomWordInputModule" prio=10
>>>>>>>>> tid=0x00007f78c8b8c000
>>>>>>>>>      nid=0x780f waiting on condition [0x00007f78abb17000]
>>>>>>>>>          java.lang.Thread.State: TIMED_WAITING (sleeping)
>>>>>>>>>           at java.lang.Thread.sleep(Native Method)
>>>>>>>>>           at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>> com.datatorrent.netlet.util.CircularBuffer.put(CircularBuffer.java:182)
>>>>
>>>>>           at
>>>>>>>>> com.datatorrent.stram.stream.InlineStream.put(InlineStream.java:79)
>>>>>>>>>           at
>>>>>>>>> com.datatorrent.stram.stream.MuxStream.put(MuxStream.java:117)
>>>>>>>>>           at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>> com.datatorrent.api.DefaultOutputPort.emit(DefaultOutputPort.java:48)
>>>>
>>>>>           at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>> com.datatorrent.benchmark.RandomWordInputModule.emitTuples(RandomWordInputModule.java:108)
>>>>
>>>>>           at
>>>>>>>>> com.datatorrent.stram.engine.InputNode.run(InputNode.java:115)
>>>>>>>>>           at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1377)
>>>>
>>>>>      "2/counter:WordCountOperator" prio=10 tid=0x00007f78c8c98800
>>>>>>>>> nid=0x780d
>>>>>>>>>      waiting on condition [0x00007f78abc18000]
>>>>>>>>>          java.lang.Thread.State: TIMED_WAITING (sleeping)
>>>>>>>>>           at java.lang.Thread.sleep(Native Method)
>>>>>>>>>           at
>>>>>>>>> com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:519)
>>>>>>>>>           at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1377)
>>>>
>>>>>      ----
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>      On 9/26/15 20:59, Amol Kekre wrote:
>>>>>>>>>
>>>>>>>>>      A good read -
>>>>>>>>>
>>>>>>>> http://preshing.com/20111118/locks-arent-slow-lock-contention-is/
>>>>
>>>>>      Though it does not explain order of magnitude difference.
>>>>>>>>>>
>>>>>>>>>>      Amol
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>      On Sat, Sep 26, 2015 at 4:25 PM, Vlad Rozov<
>>>>>>>>>> v.rozov@datatorrent.com> <mailto:v.rozov@datatorrent.com>
>>>>>>>>>>      wrote:
>>>>>>>>>>
>>>>>>>>>>      In the benchmark test THREAD_LOCAL outperforms
>>>>>>>>>> CONTAINER_LOCAL
>>>>>>>>>>
>>>>>>>>> by
>>>>
>>>>> an order
>>>>>>>>>>
>>>>>>>>>>      of magnitude and both operators compete for
CPU. I'll take a
>>>>>>>>>>> closer look
>>>>>>>>>>>      why.
>>>>>>>>>>>
>>>>>>>>>>>      Thank you,
>>>>>>>>>>>
>>>>>>>>>>>      Vlad
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>      On 9/26/15 14:52, Thomas Weise wrote:
>>>>>>>>>>>
>>>>>>>>>>>      THREAD_LOCAL - operators share thread
>>>>>>>>>>>
>>>>>>>>>>>      CONTAINER_LOCAL - each operator has its
own thread
>>>>>>>>>>>>
>>>>>>>>>>>>      So as long as operators utilize the
CPU sufficiently
>>>>>>>>>>>>
>>>>>>>>>>> (compete),
>>>>
>>>>> the
>>>>>>>>>>>>      latter
>>>>>>>>>>>>      will perform better.
>>>>>>>>>>>>
>>>>>>>>>>>>      There will be cases where a single thread
can accommodate
>>>>>>>>>>>> multiple
>>>>>>>>>>>>      operators. For example, a socket reader
(mostly waiting for
>>>>>>>>>>>>
>>>>>>>>>>> IO)
>>>>
>>>>> and a
>>>>>>>>>>>>      decompress (CPU hungry) can share a
thread.
>>>>>>>>>>>>
>>>>>>>>>>>>      But to get back to the original question,
stream locality
>>>>>>>>>>>>
>>>>>>>>>>> does
>>>>
>>>>> generally
>>>>>>>>>>>>      not reduce the total memory requirement.
If you add
>>>>>>>>>>>> multiple
>>>>>>>>>>>> operators
>>>>>>>>>>>>      into
>>>>>>>>>>>>      one container, that container will also
require more memory
>>>>>>>>>>>>
>>>>>>>>>>> and
>>>>
>>>>> that's
>>>>>>>>>>>>      how
>>>>>>>>>>>>      the container size is calculated in
the physical plan. You
>>>>>>>>>>>>
>>>>>>>>>>> may
>>>>
>>>>> get some
>>>>>>>>>>>>      extra mileage when multiple operators
share the same heap
>>>>>>>>>>>> but
>>>>>>>>>>>> the need
>>>>>>>>>>>>      to
>>>>>>>>>>>>      identify the memory requirement per
operator does not go
>>>>>>>>>>>>
>>>>>>>>>>> away.
>>>>
>>>>>      Thomas
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>      On Sat, Sep 26, 2015 at 12:41 PM, Munagala
Ramanath <
>>>>>>>>>>>>      ram@datatorrent.com <mailto:ram@datatorrent.com>>
>>>>>>>>>>>>      wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>      Would CONTAINER_LOCAL achieve the same
thing and perform a
>>>>>>>>>>>> little better
>>>>>>>>>>>>
>>>>>>>>>>>>      on
>>>>>>>>>>>>
>>>>>>>>>>>>>      a multi-core box ?
>>>>>>>>>>>>>
>>>>>>>>>>>>>      Ram
>>>>>>>>>>>>>
>>>>>>>>>>>>>      On Sat, Sep 26, 2015 at 12:18 PM,
Sandeep Deshmukh <
>>>>>>>>>>>>>      sandeep@datatorrent.com <mailto:sandeep@datatorrent.com>>
>>>>>>>>>>>>>      wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>      Yes, with this approach only two
containers are required:
>>>>>>>>>>>>>
>>>>>>>>>>>> one
>>>>
>>>>> for stram
>>>>>>>>>>>>>      and
>>>>>>>>>>>>>
>>>>>>>>>>>>>      another for all operators. You can
easily fit around 10
>>>>>>>>>>>>> operators in
>>>>>>>>>>>>>
>>>>>>>>>>>>>      less
>>>>>>>>>>>>>>      than 1GB.
>>>>>>>>>>>>>>      On 27 Sep 2015 00:32, "Timothy
Farkas"<
>>>>>>>>>>>>>> tim@datatorrent.com
>>>>>>>>>>>>>> <mailto:tim@datatorrent.com>
 wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      Hi Ram,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      You could make all the operators
thread local. This cuts
>>>>>>>>>>>>>>
>>>>>>>>>>>>> down
>>>>
>>>>> on the
>>>>>>>>>>>>>>>      overhead of separate containers
and maximizes the memory
>>>>>>>>>>>>>>> available to
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      each
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      operator.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      Tim
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      On Sat, Sep 26, 2015 at
10:07 AM, Munagala Ramanath <
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      ram@datatorrent.com <mailto:ram@datatorrent.com>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>          Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      I was running into memory
issues when deploying my  app
>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      sandbox
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      where all the operators
were stuck forever in the
>>>>>>>>>>>>>>> PENDING
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      because
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      they were being continually
aborted and restarted
>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> of
>>>>
>>>>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      limited
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      memory on the sandbox. After
some experimentation, I
>>>>>>>>>>>>>>> found
>>>>>>>>>>>>>>> that the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      following config values
seem to work:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      ------------------------------------------
>>>>>>>>>>>>>>>>      <
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>> https://datatorrent.slack.com/archives/engineering/p1443263607000010
>>>>
>>>>>      *<property>    <name>dt.attr.MASTER_MEMORY_MB</name>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      <value>500</value>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>          </property>
 <property>
>>>>>>>>>>>>>>> <name>dt.application.​.operator.*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      *​.attr.MEMORY_MB</name>
   <value>200</value>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> </property>
>>>>
>>>>>      <property>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>> <name>dt.application.TopNWordsWithQueries.operator.fileWordCount.attr.MEMORY_MB</name>
>>>>
>>>>>            <value>512</value>  </property>*
>>>>>>>>>>>>>
>>>>>>>>>>>>>      ------------------------------------------------
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      Are these reasonable values
? Is there a more systematic
>>>>>>>>>>>>>>>> way of
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      coming
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      up
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      with these values than trial-and-error
? Most of my
>>>>>>>>>>>>>>
>>>>>>>>>>>>> operators
>>>>
>>>>> -- with
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>      the
>>>>>>>>>>>>>>>      exception of fileWordCount
-- need very little memory;
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>> there a way
>>>>>>>>>>>>>>>      to
>>>>>>>>>>>>>>>      cut all values down to the
bare minimum and maximize
>>>>>>>>>>>>>>> available memory
>>>>>>>>>>>>>>>      for
>>>>>>>>>>>>>>>      this one operator ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      Thanks.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      Ram
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>
>>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message