incubator-hama-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Thomas Jungblut (Commented) (JIRA)" <>
Subject [jira] [Commented] (HAMA-521) Improve message buffering to save memory
Date Tue, 27 Mar 2012 06:21:39 GMT


Thomas Jungblut commented on HAMA-521:

bq.Regarding to memory footprint, how if storing messages remotely e.g. hdfs or spilling messages
to the target server? Or storing in e.g. memcache may be an option.

Yes. You are correct. However I think adding to HDFS will have too much overhead. We can add
some memchache behaviour later, it is quite easy to implement for ourselfs.

Okay Suraj, that are really deep design thoughts. I don't really know if they should belong
here, but let's talk about them.

bq.1. Should MessageManager hold socket address information? On failure, socket address of
few peers would change as they would get scheduled on different machine. If MessageManager
holds the socket address, then it has to be updated on failure of peers.

Yes totally. Each triggered send will check if the peer already exists. We can check within
the barrier sync if we need to evict our cache or not since the info is stored in ZK.

bq.2. Should we have identifier for each message? In my opinion we should. This would help
to remove duplicates in messages while cleanup on recovery. If that is the case, we need to
implement queue as Set (LinkedHashSet?). This would also help us implement sorting in the
message buffer. We can have TreeSet implementation underneath.

Currently I think this is huge overhead in network communication. You only get duplicate messages
when you have speculative task execution, we haven't yet, so let's discuss this separated.

I'm totally +1 for the sorting, I personally thought this could be done by just replacing
the MemoryQueue by a Comparator-backed version like a insertion sorted list. This just adds
no overhead at all and it is still a queue. No need for a tree here. However this is just
memory based, so it may not scale well.

bq.3. For that matter should we have header <id, source peer , destination peer> ?
This totally reminds me of TCP. But especially when we have speculative execution, this is
a must-have.

bq.4. There should be a simple reliable transactional protocol between two peers. When the
transaction is completed, the sender is acknowledged that the receiver has completely received
all the messages.

Transactions are fine, a very simple thing could be that we make a SHA-1 hash of the messagebundle
and check it on the other side. We are just batching transfers as a huge one rather than having
many small transfers that need to be transacted.

bq.We are sure of reading all the messages from the DiskQueue. Can we have an Iterator that
would close the file once the last record is read?

Well, it is not guranteed that the user consumes all the messages, leaving the file open would
be a no-op. So let's just add finally close functionality. It doesn't really hurt anyone.

I think you should open a "Speculative task execution" issue and put your thoughts into it
;) I think this transactional behaviour can be improved quite well, so it has negligible overhead.
Let's discuss it in another context.

Thanks you two! I have a bit of time tomorrow and I'll update the patch accordingly.

> Improve message buffering to save memory
> ----------------------------------------
>                 Key: HAMA-521
>                 URL:
>             Project: Hama
>          Issue Type: Sub-task
>            Reporter: Thomas Jungblut
>            Assignee: Thomas Jungblut
>         Attachments: HAMA-521.patch, HAMA-521_1.patch
> Suraj and I had a bit of discussion about incoming and outgoing message buffering and
> Currently everything lies on the heap, causing huge amounts of GC and waste of memory.
We can do better.
> Therefore we need to extract an abstract Messenger class which is directly under the
interface but over the compressor class.
> It should abstract the use of the queues in the back (currently lot of duplicated code)
and it should be backed by a sequencefile on local disk.
> Once sync() starts it should return a message iterator for combining and then gets put
into a message bundle which is send over RPC.
> On the other side we get a bundle and looping over it putting everything into the heap
making it much larger than it needs to be. Here we can also flush on disk because we are just
using a queue-like method to the user-side.
> Plus points:
> In case we have enough heap (see our new metric system), we can also implement a buffering
technology that is not flushing everything to disk.
> Open questions:
> I don't know how much slower the whole system gets, but it would save alot of memory.
Maybe we should first evaluate if it is really needed.
> In any case, the refactoring of the duplicate code in the messengers is needed.

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:!default.jspa
For more information on JIRA, see:


View raw message