flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Python API - Weird Performance Issue
Date Wed, 10 Sep 2014 13:17:35 GMT
Maybe there is some quirk in the way you use the datagrams. Have you tried
it through TCP sockets?

On Wed, Sep 10, 2014 at 2:30 PM, Chesnay Schepler <
chesnay.schepler@fu-berlin.de> wrote:

> only the coordination is done via UDP.
>
> i agree with what you say about the loops; currently looking into using
> FileLocks.
>
>
> On 9.9.2014 11:33, Stephan Ewen wrote:
>
>> Hey!
>>
>> The UDP version is 25x slower? That's massive. Are you sending the records
>> through that as well, or just the coordination?
>>
>> Regarding busy waiting loops: There has to be a better way to do that. It
>> will behave utterly unpredictable. Once the python side does I/O, has a
>> separate process or thread or goes asynchronously into a library
>> (scikitlearn, numpy), the loop cannot be expected to stay at 5%.
>>
>> You have tested that with a job where both java and python side have some
>> work to do. In case of a job where one side waits for the other, the
>> waiting side will burn cycles like crazy. Then run it in parallel (#cores)
>> and you may get executions where little more happens then the busy waiting
>> loop burning cycles.
>>
>> Stephan
>>
>>
>> On Mon, Sep 8, 2014 at 4:15 PM, Chesnay Schepler <
>> chesnay.schepler@fu-berlin.de> wrote:
>>
>>  sorry for the late answer.
>>>
>>> today i did a quick hack to replace the synchronization completely with
>>> udp. its still synchronous and record based, but 25x slower.
>>> regarding busy-loops i would propose the following:
>>>
>>> 1. leave the python side as it is. its doing most of the heavy lifting
>>>     anyway and will run at 100% regardless of the loops. (the loops only
>>>     take up 5% of the total runtime)
>>> 2. once we exchange buffers instead of single records the IO operations
>>>     and synchronization will take a fairly constant time. we could then
>>>     put the java process to sleep manually for that time instead of
>>>     waiting. it may not be as good as a blocking operation, but it
>>>     should keep the cpu consumption down to some extent.
>>>
>>>
>>> On 1.9.2014 22:50, Ufuk Celebi wrote:
>>>
>>>  Hey Chesnay,
>>>>
>>>> any progress on this today? Are you going for the UDP buffer
>>>> availability
>>>> notifications Stephan proposed instead of the busy loop?
>>>>
>>>> Ufuk
>>>>
>>>>
>>>> On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler <
>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>
>>>>   the performance differences occur on the same system (16GB, 4 cores +
>>>>
>>>>> HyperThreading) with a DOP of 1 for a plan consisting of a single
>>>>> operator.
>>>>> plenty of resources :/
>>>>>
>>>>>
>>>>> On 28.8.2014 0:50, Stephan Ewen wrote:
>>>>>
>>>>>   Hey Chesnay!
>>>>>
>>>>>> Here are some thoughts:
>>>>>>
>>>>>>     - The repeated checking for 1 or 0 is indeed a busy loop. These
>>>>>> may
>>>>>> behave
>>>>>> very different in different settings. If you run the code isolated,
>>>>>> you
>>>>>> have a spare core for the thread and it barely hurts. Run multiple
>>>>>> parallel
>>>>>> instances in a larger framework, and it eats away CPU cycles from
the
>>>>>> threads that do the work - it starts hurting badly.
>>>>>>
>>>>>>     - You may get around a copy into the shared memory (ByteBuffer
>>>>>> into
>>>>>> MemoryMappedFile) by creating an according DataOutputView - save
one
>>>>>> more
>>>>>> data copy. That's the next step, though, first solve the other issue.
>>>>>>
>>>>>> The last time I implemented such an inter-process data pipe between
>>>>>> languages, I had a similar issue: No support for system wide
>>>>>> semaphores
>>>>>> (or
>>>>>> something similar) on both sides.
>>>>>>
>>>>>> I used Shared memory for the buffers, and a local network socket
(UDP,
>>>>>> but
>>>>>> I guess TCP would be fine as well) for notifications when buffers
are
>>>>>> available. That worked pretty well, yielded high throughput, because
>>>>>> the
>>>>>> big buffers were not copied (unlike in streams), and the UDP
>>>>>> notifications
>>>>>> were very fast (fire and forget datagrams).
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler <
>>>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>>>
>>>>>>    Hey Stephan,
>>>>>>
>>>>>>  I'd like to point out right away that the code related to your
>>>>>>> questions
>>>>>>> is shared by both programs.
>>>>>>>
>>>>>>> regarding your first point: i have a byte[] into which i serialize
>>>>>>> the
>>>>>>> data first using a ByteBuffer, and then write that data to a
>>>>>>> MappedByteBuffer.
>>>>>>>
>>>>>>> regarding synchronization: i couldn't find a way to use elaborate
>>>>>>> things
>>>>>>> like semaphores or similar that work between python and java
alike.
>>>>>>>
>>>>>>> the data exchange is currently completely synchronous. java writes
a
>>>>>>> record, sets an "isWritten" bit and then repeatedly checks this
bit
>>>>>>> whether
>>>>>>> it is 0. python repeatedly checks this bit whether it is 1. once
that
>>>>>>> happens, it reads the record, sets the bit to 0 which tells java
that
>>>>>>> it
>>>>>>> has read the record and can write the next one. this scheme works
the
>>>>>>> same
>>>>>>> way the other way around.
>>>>>>>
>>>>>>> *NOW,* this may seem ... inefficient, to put it slightly. it
is (or
>>>>>>> rather
>>>>>>> should be...) way faster (5x) that what we had so far though
>>>>>>> (asynchronous
>>>>>>> pipes).
>>>>>>> (i also tried different schemes that all had no effect, so i
decided
>>>>>>> to
>>>>>>> stick with the easiest one)
>>>>>>>
>>>>>>> on to your last point: I'm gonna check for that tomorrow.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 27.8.2014 20:45, Stephan Ewen wrote:
>>>>>>>
>>>>>>>    Hi Chesnay!
>>>>>>>
>>>>>>>  That is an interesting problem, though hard to judge with the
>>>>>>>> information
>>>>>>>> we have.
>>>>>>>>
>>>>>>>> Can you elaborate a bit on the following points:
>>>>>>>>
>>>>>>>>      - When putting the objects from the Java Flink side
into the
>>>>>>>> shared
>>>>>>>> memory, you need to serialize them. How do you do that? Into
a
>>>>>>>> buffer,
>>>>>>>> then
>>>>>>>> copy that into the shared memory ByteBuffer? Directly?
>>>>>>>>
>>>>>>>>      - Shared memory access has to be somehow controlled.
The pipes
>>>>>>>> give
>>>>>>>> you
>>>>>>>> flow control for free (blocking write calls when the stream
consumer
>>>>>>>> is
>>>>>>>> busy). What do you do for the shared memory? Usually, one
uses
>>>>>>>> semaphores,
>>>>>>>> or, in java File(Range)Locks to coordinate access and block
until
>>>>>>>> memory
>>>>>>>> regions are made available. Can you check if there are some
busy
>>>>>>>> waiting
>>>>>>>> parts in you code?
>>>>>>>>
>>>>>>>>      - More general: The code is slower, but does it burn
CPU
>>>>>>>> cycles in
>>>>>>>> its
>>>>>>>> slowness or is it waiting for locks / monitors / conditions
?
>>>>>>>>
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler <
>>>>>>>> chesnay.schepler@fu-berlin.de> wrote:
>>>>>>>>
>>>>>>>>     Hello everyone,
>>>>>>>>
>>>>>>>>   This will be some kind of brainstorming question.
>>>>>>>>
>>>>>>>>> As some of you may know I am currently working on the
Python API.
>>>>>>>>> The
>>>>>>>>> most
>>>>>>>>> crucial part here is how the data is exchanged between
Java and
>>>>>>>>> Python.
>>>>>>>>> Up to this point we used pipes for this, but switched
recently to
>>>>>>>>> memory
>>>>>>>>> mapped files in hopes of increasing the (lacking) performance.
>>>>>>>>>
>>>>>>>>> Early (simplified) prototypes (outside of Flink) showed
that this
>>>>>>>>> would
>>>>>>>>> yield a significant increase. yet when i added the code
to flink
>>>>>>>>> and
>>>>>>>>> ran
>>>>>>>>> a
>>>>>>>>> job, there was
>>>>>>>>> no effect. like at all. two radically different schemes
ran in
>>>>>>>>> /exactly/
>>>>>>>>> the same time.
>>>>>>>>>
>>>>>>>>> my conclusion was that code already in place (and not
part of the
>>>>>>>>> prototypes) is responsible for this.
>>>>>>>>> so i went ahead and modified the prototypes to use all
relevant
>>>>>>>>> code
>>>>>>>>> from
>>>>>>>>> the Python API in order to narrow down the culprit. but
this time,
>>>>>>>>> the
>>>>>>>>> performance increase was there.
>>>>>>>>>
>>>>>>>>> Now here's the question: How can the /very same code/
perform so
>>>>>>>>> much
>>>>>>>>> worse when integrated into flink? if the code is not
the problem,
>>>>>>>>> what
>>>>>>>>> could be it?
>>>>>>>>>
>>>>>>>>> i spent a lot of time looking for that one line of code
that
>>>>>>>>> cripples
>>>>>>>>> the
>>>>>>>>> performance, but I'm pretty much out of places to look.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message