apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "bright chen (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXCORE-635) Proposal: Manage memory to avoid memory copy and garbage collection
Date Fri, 10 Feb 2017 23:00:43 GMT

    [ https://issues.apache.org/jira/browse/APEXCORE-635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15861966#comment-15861966

bright chen commented on APEXCORE-635:

- If we use direct buffer, it seems ByteBuffer has no way to avoid memcopy. What I meant to
avoid memcopy and garbage collection is explained as following by comparing details of current
and new procedure( assume each tuple serialized to N bytes, and write M tuples in one time):

current procedure(use kryo serde as example):
1) kryo serde:   write data to buffer of kryo’s output, assume the length of bytes is N
2) call Output.toBytes() to get bytes from kryo:   need memcopy, copy N bytes
3) put message type and partition:   need memcopy, copy N bytes, the total length become N+5;
N bytes need to be garbage collected
4) repeat 1 to 3 M times, and generate M slices
5) copy the M slices to ByteBuffer:  M*(N+5) bytes need to be garbage collected
6) write ByterBuffer to socket

new procedure(use kryo serde as example):
1) write message type and partition to SerializationBuffer
2) kryo serde: serde to SerializationBuffer, assume the length of bytes is N
3) call SerializationBuffer.toSlice() to get the slice: return the wrapper instead copy the
4) repeat 1 to 3 M times, and generate M slices
5) merge the M slices into one in most of case(if these slices in same block). If in different
block, probably handle one block each time.
6) copy or wrapper the to ByteBuffer. Probably need copy if want to use direct mode.
7) write ByterBuffer to socket. reset the SerializationBuffer

So, by comparing current and new procedure, current procedure need to copy around 3*N*M bytes
and garbage collection of 2*N*M  bytes, but new procedure only need copy N*M bytes and no
need garbage collection. So new approach save 2*N*M memory copy and garbage collection.

- The memory of different blocks is not continuous as we want dynamic allocate block, but
the slices in same block are continuous. As the size of block is much large than slices.

- The current fire and forget approach means no way to reuse the memory. That's why need to
override write() method by walk around way. The new implementation of write() will reset the
SerializationBuffer after wrote data to socket. The new approach don't need acknowledgement

- Yes, for variable length, it is not possible to reserve the space for length. But I doubled
why we have to use variable length. It save at most 3 bytes for each message, but made parser
harder. It's not common for protocols to use variable length. Even IP the fixed length. In
worst case, even if we don't use the reserve feature of SerializationBuffer, we still can
save 2*N*M memory copy and garbage collection.

> Proposal: Manage memory to avoid memory copy and garbage collection
> -------------------------------------------------------------------
>                 Key: APEXCORE-635
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-635
>             Project: Apache Apex Core
>          Issue Type: Wish
>            Reporter: bright chen
>            Assignee: bright chen
> Manage memory to avoid memory copy and garbage collection
> The aim of this proposal is to reuse the memory to avoid the garbage collection and avoid
unnecessary memory copy to increase the performance. In this proposal the term serde means
serialization and deserialization. It’s same as codec.
> Currently, apex by default use DefaultStatefulStreamCodec for serde, which extends Kryo
and optimize it by replace class by class id. And application developer can optimize serializer
by implement interface StreamCodec. 
> First, let’s look into the default codec DefaultStatefulStreamCodec. It basically optimize
serde by replace class name by class id as my understanding. And the state information only
send before sending first tuple, it’s kind like configuration for serde. So I suggest to
separate this feature from serde. The benefit is the customized serde can still use this feature.
And the kryo have some limitation which I’ll state later.
> Second, Let’s look at the customized serde. Let’s stand from application developer
point of view and look at how to implement StreamCodec. I take a simple tuple List<String>
as example.
> The first solution is use kryo. This is basically same as apex default codec.
> The second solution is implement StreamCodec for String and List, and ListSerde delegate
String to StringSerde. The benefit of this solution is the StringSerde ListSerde can be reused.
The problem is there need a lot of temporary memory and memory copy. Following is the sample
> Class StringSerde {
>   Slice toByteArray(String o) {
>     byte[] b = o.getBytes(“UTF8”);              // new bytes
>     byte[] b1 = new byte[b1.length + 4];      // new bytes
>     set the length of the string at the first 4 bytes
>     System.arrayCopy(b, 0, b1, 4, b.length);   //copy bytes
>     return new Slice(b1);
>   }
> class ListSerde<T> {
>   StreamCodec itemSerde;  //the serde for serialize/deserialize item
>   Slice toByteArray(List<T> list) {
>     Slice[] itemSlices = new Slice[list.size()];
>     int size = 0;
>     int index = 0;
>     for(T item : list) {
>       Slice slice = itemSerde.toByteArray(item);
>       size += slice.length;
>       itemSlices[index++] = slice;
>     }
>     byte[] b = new byte[size+4];                   //allocated the memory
>     set the length of the list at the first 4 bytes
>     copy the data from itemSlices
>     return new Slice(b);
>   }
> }
> from above code, we can see that around 2 times of required memory were allocated and
data copied twice( one copy maybe special to string, but another copy is mandatory). And when
bytes written to the socket, all allocated memory can’t be reused but need to be garbage
> The above tuple only have two levels, if the tuple have n level, n times of required
memory would be allocated and n-1 time of data copy is required.
> The third solution could be allocate memory and then pass the memory and offset to item
serde. There are some problems for this solution:
> How to pass the memory from caller? As our previous interface only pass the object but
no way to pass memory. So the pass of memory will depends on implementation.
> Another big problem of this solution is it hard to reallocate proper memory(For this
special case, it probably can allocate 2 times of all string length. ). And the memory allocated
more than required would be wasted until data send to the socket(or allocate exact memory
and copy the data to avoid waste memory). And the code also need to handle the case if memory
is not enough. 
> The fourth solution could be treat whole object as flat, allocate memory and handle it.
For example as following. This solution solve the problem of pass memory. But it has other
problems of third solution and introduced some other problems:
> Can’t reuse the code: we already have the StringSerde, but ListSerde<String>
have to implement almost same logic again. 
> The serializeItemToMemory() method should be implemented depend on different item type.
> class ListSerde<T> {
>   Slice toByteArray(List<T> list) {
>     byte[] b = new byte[…];      //hard estimate proper size.
>     int size = 0;
>     for(T item : list) {
>       int length = serializeItemToMemory(item, b, size); 
>       size += length;
>     }
>     Allocate new memory to copy data if don’t want waste memory
>   }
> }
> So, from the analysis of these solutions. It’s not easy to implement good and reusable
customize serde.
> Third, let’s look at the Kryo serde. Kryo provides Output, so each field serde write
to the same Output. This approach solve the memory problem. But the Output has some problem
> The Output, as a stream, can only write continuously. But it would be problem. For example,
when Serialize String to LV format. We don’t know what the length could be before serialization.

> The Output don’t have cache, which means the serialized data must copy to the outside
and manage them.
> The allocated memory can’t be reused without extra management.
> Another copy is required when add partition information.
> The memory allocated for different object are not continuous. Which mean need another
copy when merge multiple serialized tuple into one block to send to socket.
> My suggest solution is:
> Add SerializationBuffer which extends from kryo Output and write data to BlockStream.

> BlockStream manages a list of block; BlockStream can reserve space and fill value to
reserved space; BlockStream can reset the memory when data not used any more. We probably
can use unsafe mode to increase the performance for this part in the future.
> Add MemReuseCodec interface which extends StreamCodec, Deprecated Slice toByteArray(T
o) and add method void toByteArray(T o, SerializationBuffer output); Here, toByteArray will
not return slice, as the codec could be the top level codec or a codec of a field. Call SerializationBuffer.toSlice()
to get the slice of serialized data.
> In Publisher, keep two lists/arrays of slices, one list/array for serialize the tuples,
another list/array for sending to the socket. When wake up for writing, switch the lists/arrays.
Then merge the slice to large slice and call socket write. Reset the stream after data written.
> So the previous ListSerde can be implemented as following:
> class ListSerde<T> {
>   MemReuseCodec itemSerde;  //the serde for serialize/deserialize item
>   void toByteArray(List<T> list, SerializationBuffer buffer) {
>     buffer.reserveForLength();
>     for(T item : list) {
>       itemSerde.toByteArray(item, buffer);
>     }
>     buffer.fillLength();
>   }
> }
> The benefit of this mechanism
> the memory can be reused instead of garbage collected after data send to socket 
> avoid unnecessary memory copy. Basically can avoid all extra copy required by kryo.
> the data which send to socket can be easily merged in a block without extra memory copy.
> can easily integrate with Kryo serde due to SerializationBuffer extends from Output.

> The work need to do to integrate this mechanism to Apex without modifying netlet
> Add  MemReuseCodec field in BufferServerPublisher, which initialize in setup() if the
codec implements MemReuseCodec
> Change the DefaultStatefulStreamCodec to implement by using SerializationBuffer
> For integrate with socket, basically it only need to override write(byte[] message, int
offset, int size) and write(). But unfortunately, write() is final. So need following walk
around. Add interface ListenerExt which only have one method writeExt(); Change BufferServerPublisher
implements ListenerExt. Add DefaultEventLoopExt which extends DefaultEventLoop and override
handleSelectedKey, for selection key OP_WRITE, if it’s attachment implements ListenerExt,
call ListenerExt.writeExt(); else call write().

This message was sent by Atlassian JIRA

View raw message