hama-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Edward J. Yoon (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (HAMA-799) Add a new BSP API that uses multiple threads
Date Thu, 25 Aug 2016 23:48:20 GMT

    [ https://issues.apache.org/jira/browse/HAMA-799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15438151#comment-15438151
] 

Edward J. Yoon commented on HAMA-799:
-------------------------------------

Hi,

I originally thought that we can add something like https://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.html
and the goal was supporting easy-to-use multithreading API within BSP. But we may different
slightly.

In MapReduce case, map(K, V) function processes K, V of each line of the chunks of data sequentially
(as you already might know). The multithreadedMap processes lines concurrently and generates
intermediate files. 

The BSP model is more flexible. We can implement mapreduce framework on BSP model like below:

{code}
bsp(BSPPeer peer) {
     while (peer.readNext(key, value)) {
        map(key, value); // calls user-defined map function.
    }
    ... 
}
{code}

Then, the MultithreadedMapper is just like below:

{code}
bsp(BSPPeer peer) {
     while (peer.readNext(key, value)) {
        executor.execute(new MultithreadedMapper(key, value)); // executes map function concurrently.
    }
    ... 
}
{code}

After the while loop, above two approach will produce the same result but different performance.

The BSP model is slightly differenct. Each threads need to share the incoming and outgoing
queues. Otherwise, it's just same with increasing the number of bsp tasks (this is meaningless).
So, the multithreading should be used only for parallelization of some sequential computation
part, not whole bsp() function. For example, 

{code}
bsp() {
   ...
   for(int i = 0; i < 1000; i++) {
      ... // this part can be multi-threaded.
   }
   ...
}
{code}

In GraphJobRunner, I used multithreading like below:

{code}
  private void doSuperstep(GraphJobMessage currentMessage,
      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
      throws IOException {
    this.errorCount.set(0);
    long startTime = System.currentTimeMillis();

    this.changedVertexCnt = 0;
    vertices.startSuperstep();

    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors
        .newCachedThreadPool();
    executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 64));
    executor.setRejectedExecutionHandler(retryHandler);

    long loopStartTime = System.currentTimeMillis();
    while (currentMessage != null) {
      executor.execute(new ComputeRunnable(currentMessage));

      currentMessage = peer.getCurrentMessage();
    }
    LOG.info("Total time spent for superstep-" + peer.getSuperstepCount()
        + " looping: " + (System.currentTimeMillis() - loopStartTime) + " ms");

    executor.shutdown();
    try {
      executor.awaitTermination(60, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      throw new IOException(e);
    }

    if (errorCount.get() > 0) {
      throw new IOException("there were " + errorCount
          + " exceptions during compute vertices.");
    }

    Iterator it = vertices.iterator();
    while (it.hasNext()) {
      Vertex<V, E, M> vertex = (Vertex<V, E, M>) it.next();
      if (!vertex.isHalted() && !vertex.isComputed()) {
        vertex.compute(Collections.<M> emptyList());
        vertices.finishVertexComputation(vertex);
      }
    }

    getAggregationRunner().sendAggregatorValues(peer,
        vertices.getActiveVerticesNum(), this.changedVertexCnt);
    this.iteration++;

    LOG.info("Total time spent for superstep-" + peer.getSuperstepCount()
        + " computing vertices: " + (System.currentTimeMillis() - startTime)
        + " ms");

    startTime = System.currentTimeMillis();
    finishSuperstep();
    LOG.info("Total time spent for superstep-" + peer.getSuperstepCount()
        + " synchronizing: " + (System.currentTimeMillis() - startTime) + " ms");
}
{code}

If there's more elegant way to use multithreading in bsp() function, we can do it. Otherwise,
we should close this issue.

> Add a new BSP API that uses multiple threads
> --------------------------------------------
>
>                 Key: HAMA-799
>                 URL: https://issues.apache.org/jira/browse/HAMA-799
>             Project: Hama
>          Issue Type: New Feature
>          Components: bsp core
>            Reporter: Edward J. Yoon
>            Assignee: Edward J. Yoon
>
> Add a new (additional) BSP API that uses multiple threads, called MultithreadedBSP. This
could help in speeding up the highly CPU-intensive task.
> And, I personally would like to re-design the GraphJobRunner based on this MultithreadedBSP.
Because computing vertex 1 at a time is a reason of slow performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message