flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Python API - Weird Performance Issue
Date Tue, 09 Sep 2014 09:33:07 GMT

The UDP version is 25x slower? That's massive. Are you sending the records
through that as well, or just the coordination?

Regarding busy waiting loops: There has to be a better way to do that. It
will behave utterly unpredictable. Once the python side does I/O, has a
separate process or thread or goes asynchronously into a library
(scikitlearn, numpy), the loop cannot be expected to stay at 5%.

You have tested that with a job where both java and python side have some
work to do. In case of a job where one side waits for the other, the
waiting side will burn cycles like crazy. Then run it in parallel (#cores)
and you may get executions where little more happens then the busy waiting
loop burning cycles.


On Mon, Sep 8, 2014 at 4:15 PM, Chesnay Schepler <
chesnay.schepler@fu-berlin.de> wrote:

> sorry for the late answer.
> today i did a quick hack to replace the synchronization completely with
> udp. its still synchronous and record based, but 25x slower.
> regarding busy-loops i would propose the following:
> 1. leave the python side as it is. its doing most of the heavy lifting
>    anyway and will run at 100% regardless of the loops. (the loops only
>    take up 5% of the total runtime)
> 2. once we exchange buffers instead of single records the IO operations
>    and synchronization will take a fairly constant time. we could then
>    put the java process to sleep manually for that time instead of
>    waiting. it may not be as good as a blocking operation, but it
>    should keep the cpu consumption down to some extent.
> On 1.9.2014 22:50, Ufuk Celebi wrote:
>> Hey Chesnay,
>> any progress on this today? Are you going for the UDP buffer availability
>> notifications Stephan proposed instead of the busy loop?
>> Ufuk
>> On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler <
>> chesnay.schepler@fu-berlin.de> wrote:
>>  the performance differences occur on the same system (16GB, 4 cores +
>>> HyperThreading) with a DOP of 1 for a plan consisting of a single
>>> operator.
>>> plenty of resources :/
>>> On 28.8.2014 0:50, Stephan Ewen wrote:
>>>  Hey Chesnay!
>>>> Here are some thoughts:
>>>>    - The repeated checking for 1 or 0 is indeed a busy loop. These may
>>>> behave
>>>> very different in different settings. If you run the code isolated, you
>>>> have a spare core for the thread and it barely hurts. Run multiple
>>>> parallel
>>>> instances in a larger framework, and it eats away CPU cycles from the
>>>> threads that do the work - it starts hurting badly.
>>>>    - You may get around a copy into the shared memory (ByteBuffer into
>>>> MemoryMappedFile) by creating an according DataOutputView - save one
>>>> more
>>>> data copy. That's the next step, though, first solve the other issue.
>>>> The last time I implemented such an inter-process data pipe between
>>>> languages, I had a similar issue: No support for system wide semaphores
>>>> (or
>>>> something similar) on both sides.
>>>> I used Shared memory for the buffers, and a local network socket (UDP,
>>>> but
>>>> I guess TCP would be fine as well) for notifications when buffers are
>>>> available. That worked pretty well, yielded high throughput, because the
>>>> big buffers were not copied (unlike in streams), and the UDP
>>>> notifications
>>>> were very fast (fire and forget datagrams).
>>>> Stephan
>>>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler <
>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>   Hey Stephan,
>>>>> I'd like to point out right away that the code related to your
>>>>> questions
>>>>> is shared by both programs.
>>>>> regarding your first point: i have a byte[] into which i serialize the
>>>>> data first using a ByteBuffer, and then write that data to a
>>>>> MappedByteBuffer.
>>>>> regarding synchronization: i couldn't find a way to use elaborate
>>>>> things
>>>>> like semaphores or similar that work between python and java alike.
>>>>> the data exchange is currently completely synchronous. java writes a
>>>>> record, sets an "isWritten" bit and then repeatedly checks this bit
>>>>> whether
>>>>> it is 0. python repeatedly checks this bit whether it is 1. once that
>>>>> happens, it reads the record, sets the bit to 0 which tells java that
>>>>> it
>>>>> has read the record and can write the next one. this scheme works the
>>>>> same
>>>>> way the other way around.
>>>>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or
>>>>> rather
>>>>> should be...) way faster (5x) that what we had so far though
>>>>> (asynchronous
>>>>> pipes).
>>>>> (i also tried different schemes that all had no effect, so i decided
>>>>> stick with the easiest one)
>>>>> on to your last point: I'm gonna check for that tomorrow.
>>>>> On 27.8.2014 20:45, Stephan Ewen wrote:
>>>>>   Hi Chesnay!
>>>>>> That is an interesting problem, though hard to judge with the
>>>>>> information
>>>>>> we have.
>>>>>> Can you elaborate a bit on the following points:
>>>>>>     - When putting the objects from the Java Flink side into the
>>>>>> shared
>>>>>> memory, you need to serialize them. How do you do that? Into a buffer,
>>>>>> then
>>>>>> copy that into the shared memory ByteBuffer? Directly?
>>>>>>     - Shared memory access has to be somehow controlled. The pipes
>>>>>> give
>>>>>> you
>>>>>> flow control for free (blocking write calls when the stream consumer
>>>>>> is
>>>>>> busy). What do you do for the shared memory? Usually, one uses
>>>>>> semaphores,
>>>>>> or, in java File(Range)Locks to coordinate access and block until
>>>>>> memory
>>>>>> regions are made available. Can you check if there are some busy
>>>>>> waiting
>>>>>> parts in you code?
>>>>>>     - More general: The code is slower, but does it burn CPU cycles
>>>>>> its
>>>>>> slowness or is it waiting for locks / monitors / conditions ?
>>>>>> Stephan
>>>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler <
>>>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>>>    Hello everyone,
>>>>>>  This will be some kind of brainstorming question.
>>>>>>> As some of you may know I am currently working on the Python
API. The
>>>>>>> most
>>>>>>> crucial part here is how the data is exchanged between Java and
>>>>>>> Python.
>>>>>>> Up to this point we used pipes for this, but switched recently
>>>>>>> memory
>>>>>>> mapped files in hopes of increasing the (lacking) performance.
>>>>>>> Early (simplified) prototypes (outside of Flink) showed that
>>>>>>> would
>>>>>>> yield a significant increase. yet when i added the code to flink
>>>>>>> ran
>>>>>>> a
>>>>>>> job, there was
>>>>>>> no effect. like at all. two radically different schemes ran in
>>>>>>> /exactly/
>>>>>>> the same time.
>>>>>>> my conclusion was that code already in place (and not part of
>>>>>>> prototypes) is responsible for this.
>>>>>>> so i went ahead and modified the prototypes to use all relevant
>>>>>>> from
>>>>>>> the Python API in order to narrow down the culprit. but this
>>>>>>> the
>>>>>>> performance increase was there.
>>>>>>> Now here's the question: How can the /very same code/ perform
so much
>>>>>>> worse when integrated into flink? if the code is not the problem,
>>>>>>> what
>>>>>>> could be it?
>>>>>>> i spent a lot of time looking for that one line of code that
>>>>>>> the
>>>>>>> performance, but I'm pretty much out of places to look.

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message