Return-Path: X-Original-To: apmail-apex-dev-archive@minotaur.apache.org Delivered-To: apmail-apex-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9CDD4187DF for ; Mon, 28 Sep 2015 18:10:11 +0000 (UTC) Received: (qmail 54957 invoked by uid 500); 28 Sep 2015 18:10:05 -0000 Delivered-To: apmail-apex-dev-archive@apex.apache.org Received: (qmail 54897 invoked by uid 500); 28 Sep 2015 18:10:05 -0000 Mailing-List: contact dev-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list dev@apex.incubator.apache.org Received: (qmail 54672 invoked by uid 99); 28 Sep 2015 18:10:05 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 28 Sep 2015 18:10:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 7FE33C6B64 for ; Mon, 28 Sep 2015 18:10:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.001 X-Spam-Level: *** X-Spam-Status: No, score=3.001 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=3, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id XyYeT6c8fvhF for ; Mon, 28 Sep 2015 18:09:51 +0000 (UTC) Received: from mail-ob0-f182.google.com (mail-ob0-f182.google.com [209.85.214.182]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 4E9ED20F1E for ; Mon, 28 Sep 2015 18:09:51 +0000 (UTC) Received: by obbbh8 with SMTP id bh8so133862567obb.0 for ; Mon, 28 Sep 2015 11:09:44 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=m66Y3pZJ50aWIEUPrLsg5dTNqDq1B9URl6Yv+RWxup8=; b=AlBtfeNB0xIV2lmz8TKPRl+jLDd4TWc84xCufZtW6rnp2u0gFNfZc//2dd20iHCHfW pILZAwrhrruE7SOFCO8svMCQcFAXQL13v/y1bp9F7eNjl0Alqc+fs8LAZ+2hOBPUfctd jAM4KlNCyThaFTq071ncR83wfzQyiuViFRFgdFe6f/Bqem97hzc9K3MWg450kgWb3SBZ QdgpzEUqPGrXpSH+MfN5Ia3XJE1BS3RuUwnCzVVvrDAGxJTRNXeUBAVB7fKGHOGPLIUk 3FRpx7Dn0xbX02eH3E9KepyMMJHWZFso7MybzWAAC14eeD1VaurPJzhff7Nz8Kx5Ll5M jHtQ== X-Gm-Message-State: ALoCoQmjSPF3f+wbGZyD5d0yfZXHYCJPfKiEhMhCaidkl6WX4QzCuB0SrbtbhCrX4GW8xps0ugKy MIME-Version: 1.0 X-Received: by 10.182.60.230 with SMTP id k6mr12372816obr.83.1443463784018; Mon, 28 Sep 2015 11:09:44 -0700 (PDT) Received: by 10.202.207.139 with HTTP; Mon, 28 Sep 2015 11:09:43 -0700 (PDT) In-Reply-To: References: <56072971.7000703@datatorrent.com> <560781CC.7070200@datatorrent.com> <560840DE.1020906@datatorrent.com> <56095A82.9090505@datatorrent.com> <56097E13.3030206@datatorrent.com> Date: Mon, 28 Sep 2015 10:09:43 -0800 Message-ID: Subject: Re: Thread and Container locality From: Timothy Farkas To: dev@apex.incubator.apache.org Content-Type: multipart/alternative; boundary=089e0160bfb417cf880520d29a7a --089e0160bfb417cf880520d29a7a Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Also sharing a diff https://github.com/DataTorrent/Netlet/compare/master...ilooner:condVarBuffe= r Thanks, Tim On Mon, Sep 28, 2015 at 10:07 AM, Timothy Farkas wrote: > Hi Vlad, > > Could you share your benchmarking applications? I'd like to test a change > I made to the Circular Buffer > > > https://github.com/ilooner/Netlet/blob/condVarBuffer/src/main/java/com/da= tatorrent/netlet/util/CircularBuffer.java > > Thanks, > Tim > > On Mon, Sep 28, 2015 at 9:56 AM, Pramod Immaneni > wrote: > >> Vlad what was your mode of interaction/ordering between the two threads >> for >> the 3rd test. >> >> On Mon, Sep 28, 2015 at 10:51 AM, Vlad Rozov >> wrote: >> >> > I created a simple test to check how quickly java can count to >> > Integer.MAX_INTEGER. The result that I see is consistent with >> > CONTAINER_LOCAL behavior: >> > >> > counting long in a single thread: 0.9 sec >> > counting volatile long in a single thread: 17.7 sec >> > counting volatile long shared between two threads: 186.3 sec >> > >> > I suggest that we look into >> > >> https://qconsf.com/sf2012/dl/qcon-sanfran-2012/slides/MartinThompson_Loc= kFreeAlgorithmsForUltimatePerformanceMOVEDTOBALLROOMA.pdf >> > or similar algorithm. >> > >> > Thank you, >> > >> > Vlad >> > >> > >> > >> > On 9/28/15 08:19, Vlad Rozov wrote: >> > >> >> Ram, >> >> >> >> The stream between operators in case of CONTAINER_LOCAL is >> InlineStream. >> >> InlineStream extends DefaultReservoir that extends CircularBuffer. >> >> CircularBuffer does not use synchronized methods or locks, it uses >> >> volatile. I guess that using volatile causes CPU cache invalidation a= nd >> >> along with memory locality (in thread local case tuple is always loca= l >> to >> >> both threads, while in container local case the second operator threa= d >> may >> >> see data significantly later after the first thread produced it) thes= e >> two >> >> factors negatively impact CONTAINER_LOCAL performance. It is still >> quite >> >> surprising that the impact is so significant. >> >> >> >> Thank you, >> >> >> >> Vlad >> >> >> >> On 9/27/15 16:45, Munagala Ramanath wrote: >> >> >> >>> Vlad, >> >>> >> >>> That's a fascinating and counter-intuitive result. I wonder if some >> >>> internal synchronization is happening >> >>> (maybe the stream between them is a shared data structure that is lo= ck >> >>> protected) to >> >>> slow down the 2 threads in the CONTAINER_LOCAL case. If they are bot= h >> >>> going as fast as possible >> >>> it is likely that they will be frequently blocked by the lock. If th= at >> >>> is indeed the case, some sort of lock >> >>> striping or a near-lockless protocol for stream access should tilt t= he >> >>> balance in favor of CONTAINER_LOCAL. >> >>> >> >>> In the thread-local case of course there is no need for such locking= . >> >>> >> >>> Ram >> >>> >> >>> On Sun, Sep 27, 2015 at 12:17 PM, Vlad Rozov > >>> > wrote: >> >>> >> >>> Changed subject to reflect shift of discussion. >> >>> >> >>> After I recompiled netlet and hardcoded 0 wait time in the >> >>> CircularBuffer.put() method, I still see the same difference eve= n >> >>> when I increased operator memory to 10 GB and set "-D >> >>> dt.application.*.operator.*.attr.SPIN_MILLIS=3D0 -D >> >>> dt.application.*.operator.*.attr.QUEUE_CAPACITY=3D1024000". CPU = % >> >>> is close to 100% both for thread and container local locality >> >>> settings. Note that in thread local two operators share 100% CPU= , >> >>> while in container local each gets its own 100% load. It sounds >> >>> that container local will outperform thread local only when >> >>> number of emitted tuples is (relatively) low, for example when i= t >> >>> is CPU costly to produce tuples (hash computations, >> >>> compression/decompression, aggregations, filtering with complex >> >>> expressions). In cases where operator may emit 5 or more million >> >>> tuples per second, thread local may outperform container local >> >>> even when both operators are CPU intensive. >> >>> >> >>> >> >>> >> >>> >> >>> Thank you, >> >>> >> >>> Vlad >> >>> >> >>> On 9/26/15 22:52, Timothy Farkas wrote: >> >>> >> >>>> Hi Vlad, >> >>>> >> >>>> I just took a look at the CircularBuffer. Why are threads polli= ng >> >>>> the state >> >>>> of the buffer before doing operations? Couldn't polling be >> avoided >> >>>> entirely >> >>>> by using something like Condition variables to signal when the >> >>>> buffer is >> >>>> ready for an operation to be performed? >> >>>> >> >>>> Tim >> >>>> >> >>>> On Sat, Sep 26, 2015 at 10:42 PM, Vlad Rozov< >> >>>> v.rozov@datatorrent.com> >> >>>> wrote: >> >>>> >> >>>> After looking at few stack traces I think that in the benchmark >> >>>>> application operators compete for the circular buffer that >> passes >> >>>>> slices >> >>>>> from the emitter output to the consumer input and sleeps that >> >>>>> avoid busy >> >>>>> wait are too long for the benchmark operators. I don't see the >> >>>>> stack >> >>>>> similar to the one below all the time I take the threads dump, >> but >> >>>>> still >> >>>>> quite often to suspect that sleep is the root cause. I'll >> >>>>> recompile with >> >>>>> smaller sleep time and see how this will affect performance. >> >>>>> >> >>>>> ---- >> >>>>> "1/wordGenerator:RandomWordInputModule" prio=3D10 >> >>>>> tid=3D0x00007f78c8b8c000 >> >>>>> nid=3D0x780f waiting on condition [0x00007f78abb17000] >> >>>>> java.lang.Thread.State: TIMED_WAITING (sleeping) >> >>>>> at java.lang.Thread.sleep(Native Method) >> >>>>> at >> >>>>> >> >>>>> >> com.datatorrent.netlet.util.CircularBuffer.put(CircularBuffer.java:182) >> >>>>> at >> >>>>> com.datatorrent.stram.stream.InlineStream.put(InlineStream.java:79= ) >> >>>>> at >> >>>>> com.datatorrent.stram.stream.MuxStream.put(MuxStream.java:117) >> >>>>> at >> >>>>> >> >>>>> >> com.datatorrent.api.DefaultOutputPort.emit(DefaultOutputPort.java:48) >> >>>>> at >> >>>>> >> >>>>> >> com.datatorrent.benchmark.RandomWordInputModule.emitTuples(RandomWordInp= utModule.java:108) >> >>>>> at >> >>>>> com.datatorrent.stram.engine.InputNode.run(InputNode.java:115) >> >>>>> at >> >>>>> >> >>>>> >> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer= .java:1377) >> >>>>> >> >>>>> "2/counter:WordCountOperator" prio=3D10 tid=3D0x00007f78c8c988= 00 >> >>>>> nid=3D0x780d >> >>>>> waiting on condition [0x00007f78abc18000] >> >>>>> java.lang.Thread.State: TIMED_WAITING (sleeping) >> >>>>> at java.lang.Thread.sleep(Native Method) >> >>>>> at >> >>>>> com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:519) >> >>>>> at >> >>>>> >> >>>>> >> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer= .java:1377) >> >>>>> >> >>>>> ---- >> >>>>> >> >>>>> >> >>>>> On 9/26/15 20:59, Amol Kekre wrote: >> >>>>> >> >>>>> A good read - >> >>>>>> >> http://preshing.com/20111118/locks-arent-slow-lock-contention-is/ >> >>>>>> >> >>>>>> Though it does not explain order of magnitude difference. >> >>>>>> >> >>>>>> Amol >> >>>>>> >> >>>>>> >> >>>>>> On Sat, Sep 26, 2015 at 4:25 PM, Vlad Rozov< >> >>>>>> v.rozov@datatorrent.com> >> >>>>>> wrote: >> >>>>>> >> >>>>>> In the benchmark test THREAD_LOCAL outperforms CONTAINER_LOCA= L >> by >> >>>>>> an order >> >>>>>> >> >>>>>>> of magnitude and both operators compete for CPU. I'll take a >> >>>>>>> closer look >> >>>>>>> why. >> >>>>>>> >> >>>>>>> Thank you, >> >>>>>>> >> >>>>>>> Vlad >> >>>>>>> >> >>>>>>> >> >>>>>>> On 9/26/15 14:52, Thomas Weise wrote: >> >>>>>>> >> >>>>>>> THREAD_LOCAL - operators share thread >> >>>>>>> >> >>>>>>>> CONTAINER_LOCAL - each operator has its own thread >> >>>>>>>> >> >>>>>>>> So as long as operators utilize the CPU sufficiently >> (compete), >> >>>>>>>> the >> >>>>>>>> latter >> >>>>>>>> will perform better. >> >>>>>>>> >> >>>>>>>> There will be cases where a single thread can accommodate >> >>>>>>>> multiple >> >>>>>>>> operators. For example, a socket reader (mostly waiting for >> IO) >> >>>>>>>> and a >> >>>>>>>> decompress (CPU hungry) can share a thread. >> >>>>>>>> >> >>>>>>>> But to get back to the original question, stream locality >> does >> >>>>>>>> generally >> >>>>>>>> not reduce the total memory requirement. If you add multipl= e >> >>>>>>>> operators >> >>>>>>>> into >> >>>>>>>> one container, that container will also require more memory >> and >> >>>>>>>> that's >> >>>>>>>> how >> >>>>>>>> the container size is calculated in the physical plan. You >> may >> >>>>>>>> get some >> >>>>>>>> extra mileage when multiple operators share the same heap b= ut >> >>>>>>>> the need >> >>>>>>>> to >> >>>>>>>> identify the memory requirement per operator does not go >> away. >> >>>>>>>> >> >>>>>>>> Thomas >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On Sat, Sep 26, 2015 at 12:41 PM, Munagala Ramanath < >> >>>>>>>> ram@datatorrent.com > >> >>>>>>>> wrote: >> >>>>>>>> >> >>>>>>>> Would CONTAINER_LOCAL achieve the same thing and perform a >> >>>>>>>> little better >> >>>>>>>> >> >>>>>>>> on >> >>>>>>>>> a multi-core box ? >> >>>>>>>>> >> >>>>>>>>> Ram >> >>>>>>>>> >> >>>>>>>>> On Sat, Sep 26, 2015 at 12:18 PM, Sandeep Deshmukh < >> >>>>>>>>> sandeep@datatorrent.com > >> >>>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>> Yes, with this approach only two containers are required: >> one >> >>>>>>>>> for stram >> >>>>>>>>> and >> >>>>>>>>> >> >>>>>>>>> another for all operators. You can easily fit around 10 >> >>>>>>>>> operators in >> >>>>>>>>> >> >>>>>>>>>> less >> >>>>>>>>>> than 1GB. >> >>>>>>>>>> On 27 Sep 2015 00:32, "Timothy Farkas"> > >> >>>>>>>>>> wrote: >> >>>>>>>>>> >> >>>>>>>>>> Hi Ram, >> >>>>>>>>>> >> >>>>>>>>>> You could make all the operators thread local. This cuts >> down >> >>>>>>>>>>> on the >> >>>>>>>>>>> overhead of separate containers and maximizes the memory >> >>>>>>>>>>> available to >> >>>>>>>>>>> >> >>>>>>>>>>> each >> >>>>>>>>>>> >> >>>>>>>>>> operator. >> >>>>>>>>>> >> >>>>>>>>>>> Tim >> >>>>>>>>>>> >> >>>>>>>>>>> On Sat, Sep 26, 2015 at 10:07 AM, Munagala Ramanath < >> >>>>>>>>>>> >> >>>>>>>>>>> ram@datatorrent.com >> >>>>>>>>>>> >> >>>>>>>>>> wrote: >> >>>>>>>>>> >> >>>>>>>>>> Hi, >> >>>>>>>>>>> >> >>>>>>>>>>> I was running into memory issues when deploying my app = on >> >>>>>>>>>>>> the >> >>>>>>>>>>>> >> >>>>>>>>>>>> sandbox >> >>>>>>>>>>>> >> >>>>>>>>>>> where all the operators were stuck forever in the PENDIN= G >> >>>>>>>>>> state >> >>>>>>>>>> >> >>>>>>>>>> because >> >>>>>>>>>>> >> >>>>>>>>>>> they were being continually aborted and restarted becaus= e >> of >> >>>>>>>>>> the >> >>>>>>>>>> >> >>>>>>>>>> limited >> >>>>>>>>>>> memory on the sandbox. After some experimentation, I fou= nd >> >>>>>>>>>>> that the >> >>>>>>>>>>> >> >>>>>>>>>>> following config values seem to work: >> >>>>>>>>>>>> ------------------------------------------ >> >>>>>>>>>>>> < >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> https://datatorrent.slack.com/archives/engineering/p1443263607000010 >> >>>>>>>>>>>> >> >>>>>>>>>>>> * dt.attr.MASTER_MEMORY_MB >> >>>>>>>>>>>> >> >>>>>>>>>>>> 500 >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> dt.application.=E2=80=8B.operator.* >> >>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> *=E2=80=8B.attr.MEMORY_MB 200 >> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>> >> dt.application.TopNWordsWithQueries.operator.fileWordCount.attr.ME= MORY_MB >> >>>>>>>>> >> >>>>>>>>> 512 * >> >>>>>>>>> >> >>>>>>>>>> ------------------------------------------------ >> >>>>>>>>>>> >> >>>>>>>>>>>> Are these reasonable values ? Is there a more systemati= c >> >>>>>>>>>>>> way of >> >>>>>>>>>>>> >> >>>>>>>>>>>> coming >> >>>>>>>>>>>> >> >>>>>>>>>>> up >> >>>>>>>>>> >> >>>>>>>>>> with these values than trial-and-error ? Most of my >> operators >> >>>>>>>>>> -- with >> >>>>>>>>>> >> >>>>>>>>>>> the >> >>>>>>>>>>> exception of fileWordCount -- need very little memory; i= s >> >>>>>>>>>>> there a way >> >>>>>>>>>>> to >> >>>>>>>>>>> cut all values down to the bare minimum and maximize >> >>>>>>>>>>> available memory >> >>>>>>>>>>> for >> >>>>>>>>>>> this one operator ? >> >>>>>>>>>>> >> >>>>>>>>>>> Thanks. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Ram >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>> >> >>> >> >> >> > >> > > --089e0160bfb417cf880520d29a7a--