giraph-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jan van der Lugt <janl...@gmail.com>
Subject Multithreaded mapper in Giraph
Date Tue, 26 Jun 2012 17:26:05 GMT
Hi all,

While I was browsing through the Giraph code, I wondered why the Giraph
GraphMapper is not implemented in a multithreaded fashion. The whole model
of Pregel ensures the execution is embarrassingly parallel, so the
implementation should be easy. If the marked piece of code below is
extracted into a FutureTask and executed by a threadpoolexecutor (with a
user-specifiable number of threads or automatic based on the machine), the
code would make use of multithreaded machines much more efficiently. If I
would want to fully utilize a dual-octocore machine with hyperthreading in
the current model, I would need to have 20-30 mappers, which implies
running 20-30 JVMs, 20-30 Netty clients, etc. I propose to change the code
below to make it multithreaded, I believe this can make Giraph much faster.
Just wanted to know your opinions on this and reasons why the mapper is
currently not multithreaded before I start working on it. I'll have some
performance figures later on so we can decide whether it is worthwhile to
merge the code in the mainline branch.

for (Partition<I, V, E, M> partition
: serviceWorker.getPartitionMap().values()) {
  PartitionStats partitionStats = new
PartitionStats(partition.getPartitionId(), 0, 0, 0);
  for (BasicVertex<I, V, E, M> basicVertex : partition.getVertices()) {
    /* FROM HERE */
    basicVertex.setGraphState(graphState);
    if (basicVertex.isHalted()
& !Iterables.isEmpty(basicVertex.getMessages())) {
      basicVertex.halt = false;
    }
    if (!basicVertex.isHalted()) {
      Iterator<M> vertexMsgIt = basicVertex.getMessages().iterator();
      context.progress();
      basicVertex.compute(vertexMsgIt);
      basicVertex.releaseResources();
    }
    if (basicVertex.isHalted()) {
      partitionStats.incrFinishedVertexCount();
    }
    partitionStats.incrVertexCount();
    partitionStats.addEdgeCount(basicVertex.getNumOutEdges());
    /* TO HERE */
  }
  partitionStatsList.add(partitionStats);
}

- Jan

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message