incubator-giraph-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Avery Ching (Issue Comment Edited) (JIRA)" <j...@apache.org>
Subject [jira] [Issue Comment Edited] (GIRAPH-45) Improve the way to keep outgoing messages
Date Wed, 14 Dec 2011 22:27:30 GMT

    [ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13169756#comment-13169756
] 

Avery Ching edited comment on GIRAPH-45 at 12/14/11 10:26 PM:
--------------------------------------------------------------

I've been thinking about this a bit more.  I don't think we actually need a database if we
use disk friendly approach and take advantage of the knowledge of our system.  Here is a rough
proposal:

There are two ways we can save memory here (out-of-core graph) and (out-of-core messages).
 In this way, we can use the memory as a cache rather than a totally in-memory database and
messaging system.

Here's how we can do the out-of-core graph:

Workers already do the computation by partition.  All partitions that are owned by the worker
need to be processed and we want to minimize the amount of data loaded/stored to local disk
(i.e. <superstep>.<worker id>.<partition #>.vertices).  Local disk should
be used here because it will be faster and no remote worker needs to directly access this
data.

Therefore the general algorithm would be

{noformat} 
for (partition : all in memory partitions)
  partition.computeAndGenerateOutgoingMessages()
  if (memoryPressure)
     partition.storeToFileSystem()
for (partition : remaining in file system partitions)
  partition.loadFromFileSystem()
  partition.computeAndGenerateOutgoingMessages()
  if (memoryPressure)
     partition.storeToFileSystem()
{noformat} 

This should keep our partition cache as full as possible and have a minimal amount of loading/storing
for partitions that can't fit in memory.

Here's how we can do the out-of-core messaging:

As the partitions are being processed by the workers, outgoing messages as kept in memory
currently.  They are flushed is a message list grows to a certain size.  Otherwise, the messages
are bulk sent at the end of the computation.

What we can do is wait for a sendMessageReq and check for memory pressure.  If memory pressure
is an issue, then dump all the outgoing messages to HDFS files (i.e. <superstep>.<worker
id>.<partition #>.outgoingMessages).  Future sendMessageReq may be kept in memory
or dumped to the same HDFS files if memory pressure is an issue.  These HDFS files are closed
prior to the flush.  During the flush, the worker sends the in-memory messages as normal to
the destinations as well as the filenames of the out-of-core messages to their respective
owners.  Note that the files are stored in HDFS to allow a remote worker the ability to load
the messages as they see fit.  Maybe reduce the replication factor to 2 by default for these
files?

This tactic should reduce memory usage on the destination worker as well, since the destination
workers don't need to load the HDFS files until they are actually doing the computation for
that partition.

Checkpoints should be able to point to the out-of-core data as well to reduce the amount of
data to store.

Still, there is one more remaining piece (loading the graph).  This can also run out of memory.
 Currently vertex lists are batched and sent to destination workers by partition.  Partitions
should have the ability to be incrementally dumped to local files on the destination if there
is memory pressure.  Then prior to the 1st superstep, each partition can be assembled (local
files + any vertices stil in memory) and can use the out-of-core graph algorithm indicated
above.

This proposal should take advantage of large reads/writes so that we don't need a database.
 I will require out-of-core storage in the very near future as the graph i need to load will
have billions of edges and I probably won't have enough nodes and memory to keep it all in
core.  Please let me know your thoughts on this approach.

                
      was (Author: aching):
    I've been thinking about this a bit more.  I don't think we actually need a database if
we use disk friendly approach and take advantage of the knowledge of our system.  Here is
a rough proposal:

There are two ways we can save memory here (out-of-core graph) and (out-of-core messages).
 In this way, we can use the memory as a cache rather than a totally in-memory database and
messaging system.

Here's how we can do the out-of-core graph:

Workers already do the computation by partition.  All partitions that are owned by the worker
need to be processed and we want to minimize the amount of data loaded/stored to local disk
(i.e. <superstep>.<worker id>.<partition #>.vertices).  Local disk should
be used here because it will be faster and no remote worker needs to directly access this
data.

Therefore the general algorithm would be

for (partition : all in memory partitions)
  partition.computeAndGenerateOutgoingMessages()
  if (memoryPressure)
     partition.storeToFileSystem()

for (partition : remaining in file system partitions)
  partition.loadFromFileSystem()
  partition.computeAndGenerateOutgoingMessages()
  if (memoryPressure)
     partition.storeToFileSystem()

This should keep our partition cache as full as possible and have a minimal amount of loading/storing
for partitions that can't fit in memory.

Here's how we can do the out-of-core messaging:

As the partitions are being processed by the workers, outgoing messages as kept in memory
currently.  They are flushed is a message list grows to a certain size.  Otherwise, the messages
are bulk sent at the end of the computation.

What we can do is wait for a sendMessageReq and check for memory pressure.  If memory pressure
is an issue, then dump all the outgoing messages to HDFS files (i.e. <superstep>.<worker
id>.<partition #>.outgoingMessages).  Future sendMessageReq may be kept in memory
or dumped to the same HDFS files if memory pressure is an issue.  These HDFS files are closed
prior to the flush.  During the flush, the worker sends the in-memory messages as normal to
the destinations as well as the filenames of the out-of-core messages to their respective
owners.  Note that the files are stored in HDFS to allow a remote worker the ability to load
the messages as they see fit.  Maybe reduce the replication factor to 2 by default for these
files?

This tactic should reduce memory usage on the destination worker as well, since the destination
workers don't need to load the HDFS files until they are actually doing the computation for
that partition.

Checkpoints should be able to point to the out-of-core data as well to reduce the amount of
data to store.

Still, there is one more remaining piece (loading the graph).  This can also run out of memory.
 Currently vertex lists are batched and sent to destination workers by partition.  Partitions
should have the ability to be incrementally dumped to local files on the destination if there
is memory pressure.  Then prior to the 1st superstep, each partition can be assembled (local
files + any vertices stil in memory) and can use the out-of-core graph algorithm indicated
above.

This proposal should take advantage of large reads/writes so that we don't need a database.
 I will require out-of-core storage in the very near future as the graph i need to load will
have billions of edges and I probably won't have enough nodes and memory to keep it all in
core.  Please let me know your thoughts on this approach.

                  
> Improve the way to keep outgoing messages
> -----------------------------------------
>
>                 Key: GIRAPH-45
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-45
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>            Reporter: Hyunsik Choi
>            Assignee: Hyunsik Choi
>
> As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a potential problem
to cause out of memory when the rate of message generation is higher than the rate of message
flush (or network bandwidth).
> To overcome this problem, we need more eager strategy for message flushing or some approach
to spill messages into disk.
> The below link is Dmitriy's suggestion.
> https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message