flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <chesnay.schep...@fu-berlin.de>
Subject Re: Python API - Weird Performance Issue
Date Wed, 10 Sep 2014 13:49:44 GMT
havent tried tcp.

all i do is create a socket and use send/receive operations as some kind 
of semaphore. i dont even access the contents of the datagram.

On 10.9.2014 15:17, Stephan Ewen wrote:
> Maybe there is some quirk in the way you use the datagrams. Have you tried
> it through TCP sockets?
>
> On Wed, Sep 10, 2014 at 2:30 PM, Chesnay Schepler <
> chesnay.schepler@fu-berlin.de> wrote:
>
>> only the coordination is done via UDP.
>>
>> i agree with what you say about the loops; currently looking into using
>> FileLocks.
>>
>>
>> On 9.9.2014 11:33, Stephan Ewen wrote:
>>
>>> Hey!
>>>
>>> The UDP version is 25x slower? That's massive. Are you sending the records
>>> through that as well, or just the coordination?
>>>
>>> Regarding busy waiting loops: There has to be a better way to do that. It
>>> will behave utterly unpredictable. Once the python side does I/O, has a
>>> separate process or thread or goes asynchronously into a library
>>> (scikitlearn, numpy), the loop cannot be expected to stay at 5%.
>>>
>>> You have tested that with a job where both java and python side have some
>>> work to do. In case of a job where one side waits for the other, the
>>> waiting side will burn cycles like crazy. Then run it in parallel (#cores)
>>> and you may get executions where little more happens then the busy waiting
>>> loop burning cycles.
>>>
>>> Stephan
>>>
>>>
>>> On Mon, Sep 8, 2014 at 4:15 PM, Chesnay Schepler <
>>> chesnay.schepler@fu-berlin.de> wrote:
>>>
>>>   sorry for the late answer.
>>>> today i did a quick hack to replace the synchronization completely with
>>>> udp. its still synchronous and record based, but 25x slower.
>>>> regarding busy-loops i would propose the following:
>>>>
>>>> 1. leave the python side as it is. its doing most of the heavy lifting
>>>>      anyway and will run at 100% regardless of the loops. (the loops only
>>>>      take up 5% of the total runtime)
>>>> 2. once we exchange buffers instead of single records the IO operations
>>>>      and synchronization will take a fairly constant time. we could then
>>>>      put the java process to sleep manually for that time instead of
>>>>      waiting. it may not be as good as a blocking operation, but it
>>>>      should keep the cpu consumption down to some extent.
>>>>
>>>>
>>>> On 1.9.2014 22:50, Ufuk Celebi wrote:
>>>>
>>>>   Hey Chesnay,
>>>>> any progress on this today? Are you going for the UDP buffer
>>>>> availability
>>>>> notifications Stephan proposed instead of the busy loop?
>>>>>
>>>>> Ufuk
>>>>>
>>>>>
>>>>> On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler <
>>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>>
>>>>>    the performance differences occur on the same system (16GB, 4 cores
+
>>>>>
>>>>>> HyperThreading) with a DOP of 1 for a plan consisting of a single
>>>>>> operator.
>>>>>> plenty of resources :/
>>>>>>
>>>>>>
>>>>>> On 28.8.2014 0:50, Stephan Ewen wrote:
>>>>>>
>>>>>>    Hey Chesnay!
>>>>>>
>>>>>>> Here are some thoughts:
>>>>>>>
>>>>>>>      - The repeated checking for 1 or 0 is indeed a busy loop.
These
>>>>>>> may
>>>>>>> behave
>>>>>>> very different in different settings. If you run the code isolated,
>>>>>>> you
>>>>>>> have a spare core for the thread and it barely hurts. Run multiple
>>>>>>> parallel
>>>>>>> instances in a larger framework, and it eats away CPU cycles
from the
>>>>>>> threads that do the work - it starts hurting badly.
>>>>>>>
>>>>>>>      - You may get around a copy into the shared memory (ByteBuffer
>>>>>>> into
>>>>>>> MemoryMappedFile) by creating an according DataOutputView - save
one
>>>>>>> more
>>>>>>> data copy. That's the next step, though, first solve the other
issue.
>>>>>>>
>>>>>>> The last time I implemented such an inter-process data pipe between
>>>>>>> languages, I had a similar issue: No support for system wide
>>>>>>> semaphores
>>>>>>> (or
>>>>>>> something similar) on both sides.
>>>>>>>
>>>>>>> I used Shared memory for the buffers, and a local network socket
(UDP,
>>>>>>> but
>>>>>>> I guess TCP would be fine as well) for notifications when buffers
are
>>>>>>> available. That worked pretty well, yielded high throughput,
because
>>>>>>> the
>>>>>>> big buffers were not copied (unlike in streams), and the UDP
>>>>>>> notifications
>>>>>>> were very fast (fire and forget datagrams).
>>>>>>>
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler <
>>>>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>>>>
>>>>>>>     Hey Stephan,
>>>>>>>
>>>>>>>   I'd like to point out right away that the code related to your
>>>>>>>> questions
>>>>>>>> is shared by both programs.
>>>>>>>>
>>>>>>>> regarding your first point: i have a byte[] into which i
serialize
>>>>>>>> the
>>>>>>>> data first using a ByteBuffer, and then write that data to
a
>>>>>>>> MappedByteBuffer.
>>>>>>>>
>>>>>>>> regarding synchronization: i couldn't find a way to use elaborate
>>>>>>>> things
>>>>>>>> like semaphores or similar that work between python and java
alike.
>>>>>>>>
>>>>>>>> the data exchange is currently completely synchronous. java
writes a
>>>>>>>> record, sets an "isWritten" bit and then repeatedly checks
this bit
>>>>>>>> whether
>>>>>>>> it is 0. python repeatedly checks this bit whether it is
1. once that
>>>>>>>> happens, it reads the record, sets the bit to 0 which tells
java that
>>>>>>>> it
>>>>>>>> has read the record and can write the next one. this scheme
works the
>>>>>>>> same
>>>>>>>> way the other way around.
>>>>>>>>
>>>>>>>> *NOW,* this may seem ... inefficient, to put it slightly.
it is (or
>>>>>>>> rather
>>>>>>>> should be...) way faster (5x) that what we had so far though
>>>>>>>> (asynchronous
>>>>>>>> pipes).
>>>>>>>> (i also tried different schemes that all had no effect, so
i decided
>>>>>>>> to
>>>>>>>> stick with the easiest one)
>>>>>>>>
>>>>>>>> on to your last point: I'm gonna check for that tomorrow.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 27.8.2014 20:45, Stephan Ewen wrote:
>>>>>>>>
>>>>>>>>     Hi Chesnay!
>>>>>>>>
>>>>>>>>   That is an interesting problem, though hard to judge with
the
>>>>>>>>> information
>>>>>>>>> we have.
>>>>>>>>>
>>>>>>>>> Can you elaborate a bit on the following points:
>>>>>>>>>
>>>>>>>>>       - When putting the objects from the Java Flink
side into the
>>>>>>>>> shared
>>>>>>>>> memory, you need to serialize them. How do you do that?
Into a
>>>>>>>>> buffer,
>>>>>>>>> then
>>>>>>>>> copy that into the shared memory ByteBuffer? Directly?
>>>>>>>>>
>>>>>>>>>       - Shared memory access has to be somehow controlled.
The pipes
>>>>>>>>> give
>>>>>>>>> you
>>>>>>>>> flow control for free (blocking write calls when the
stream consumer
>>>>>>>>> is
>>>>>>>>> busy). What do you do for the shared memory? Usually,
one uses
>>>>>>>>> semaphores,
>>>>>>>>> or, in java File(Range)Locks to coordinate access and
block until
>>>>>>>>> memory
>>>>>>>>> regions are made available. Can you check if there are
some busy
>>>>>>>>> waiting
>>>>>>>>> parts in you code?
>>>>>>>>>
>>>>>>>>>       - More general: The code is slower, but does it
burn CPU
>>>>>>>>> cycles in
>>>>>>>>> its
>>>>>>>>> slowness or is it waiting for locks / monitors / conditions
?
>>>>>>>>>
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler <
>>>>>>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>>>>>>
>>>>>>>>>      Hello everyone,
>>>>>>>>>
>>>>>>>>>    This will be some kind of brainstorming question.
>>>>>>>>>
>>>>>>>>>> As some of you may know I am currently working on
the Python API.
>>>>>>>>>> The
>>>>>>>>>> most
>>>>>>>>>> crucial part here is how the data is exchanged between
Java and
>>>>>>>>>> Python.
>>>>>>>>>> Up to this point we used pipes for this, but switched
recently to
>>>>>>>>>> memory
>>>>>>>>>> mapped files in hopes of increasing the (lacking)
performance.
>>>>>>>>>>
>>>>>>>>>> Early (simplified) prototypes (outside of Flink)
showed that this
>>>>>>>>>> would
>>>>>>>>>> yield a significant increase. yet when i added the
code to flink
>>>>>>>>>> and
>>>>>>>>>> ran
>>>>>>>>>> a
>>>>>>>>>> job, there was
>>>>>>>>>> no effect. like at all. two radically different schemes
ran in
>>>>>>>>>> /exactly/
>>>>>>>>>> the same time.
>>>>>>>>>>
>>>>>>>>>> my conclusion was that code already in place (and
not part of the
>>>>>>>>>> prototypes) is responsible for this.
>>>>>>>>>> so i went ahead and modified the prototypes to use
all relevant
>>>>>>>>>> code
>>>>>>>>>> from
>>>>>>>>>> the Python API in order to narrow down the culprit.
but this time,
>>>>>>>>>> the
>>>>>>>>>> performance increase was there.
>>>>>>>>>>
>>>>>>>>>> Now here's the question: How can the /very same code/
perform so
>>>>>>>>>> much
>>>>>>>>>> worse when integrated into flink? if the code is
not the problem,
>>>>>>>>>> what
>>>>>>>>>> could be it?
>>>>>>>>>>
>>>>>>>>>> i spent a lot of time looking for that one line of
code that
>>>>>>>>>> cripples
>>>>>>>>>> the
>>>>>>>>>> performance, but I'm pretty much out of places to
look.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>


Mime
View raw message