cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Piotr Kołaczkowski (JIRA) <j...@apache.org>
Subject [jira] [Commented] (CASSANDRA-4718) More-efficient ExecutorService for improved throughput
Date Thu, 18 Apr 2013 12:27:19 GMT

    [ https://issues.apache.org/jira/browse/CASSANDRA-4718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13635120#comment-13635120
] 

Piotr Kołaczkowski commented on CASSANDRA-4718:
-----------------------------------------------

Another thing to consider might be using a high-performance Actor library e.g. Akka.

I did a quick microbenchmark to see what is the latency of just passing a single message through
several stages, in 3 variants:

1. Sync: one threadpool per stage, where some coordinator thread just moves message from one
ExecutorService to another, after the stage finished processing
2. Async: one threadpool per stage, where every stage directly asynchronously pushes its result
into the next stage
3. Akka: one Akka actor per stage, where every stage directly asynchronously pushes its result
into the next stage

The clear winner is Akka:
{noformat}
2 stages: 
Sync:     38717 ns
Async:    36159 ns
Akka:     12969 ns

4 stages: 
Sync:     65793 ns
Async:    49964 ns
Akka:     18516 ns

8 stages: 
Sync:    162256 ns
Async:   100009 ns
Akka:      9237 ns

16 stages: 
Sync:    296951 ns
Async:   183588 ns
Akka:     13574 ns

32 stages: 
Sync:    572605 ns
Async:   361959 ns
Akka:     23344 ns
{noformat}

Code of the benchmark:
{noformat}
package pl.pk.messaging

import java.util.concurrent.{CountDownLatch, Executors}
import akka.actor.{Props, ActorSystem, Actor, ActorRef}


class Message {
  var counter = 0
  val latch = new CountDownLatch(1)
}

abstract class MultistageThreadPoolProcessor(stageCount: Int) {

  val stages =
    for (i <- 1 to stageCount) yield Executors.newCachedThreadPool()

  def shutdown() {
    stages.foreach(_.shutdown())
  }

}

/** Synchronously processes a message through the stages.
  * The message is passed stage-to-stage by the coordinator thread. */
class SyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount)
{

  def process() {

    val message = new Message

    val task = new Runnable() {
      def run() { message.counter += 1 }
    }

    for (executor <- stages)
      executor.submit(task).get()
  }
}

/** Asynchronously processes a message through the stages.
  * Every stage after finishing its processing of the message
  * passes the message directly to the next stage, without bothering the coordinator thread.
*/
class AsyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount)
{

  def process() {

    val message = new Message

    val task = new Runnable() {
      def run() {
        message.counter += 1
        if (message.counter >= stages.size)
          message.latch.countDown()
        else
          stages(message.counter).submit(this)
      }
    }

    stages(0).submit(task)
    message.latch.await()
  }
}

/** Similar to AsyncThreadPoolProcessor but it uses Akka actor system instead of thread pools
and queues.
  * Every stage after finishing its processing of the message
  * passes the message directly to the next stage, without bothering the coordinator thread.
*/
class AkkaProcessor(stageCount: Int) {

  val system = ActorSystem()

  val stages: IndexedSeq[ActorRef] = {
    for (i <- 1 to stageCount) yield system.actorOf(Props(new Actor {
      def receive = {
        case m: Message =>
          m.counter += 1
          if (m.counter >= stages.size)
            m.latch.countDown()
          else
            stages(m.counter) ! m
      }
    }))
  }

  def process() {
    val message = new Message
    stages(0) ! message
    message.latch.await()
  }

  def shutdown() {
    system.shutdown()
  }

}



object MessagingBenchmark extends App {

  def measureLatency(count: Int, f: () => Any): Double = {
    val start = System.nanoTime()
    for (i <- 1 to count)
      f()
    val end = System.nanoTime()
    (end - start).toDouble / count
  }

  val messageCount = 200000
  for (stageCount <- List(2,4,8,16,32))
  {
    printf("\n%d stages: \n", stageCount)
    val syncProcessor = new SyncThreadPoolProcessor(stageCount)
    val asyncProcessor = new AsyncThreadPoolProcessor(stageCount)
    val akkaProcessor = new AkkaProcessor(stageCount)

    printf("Sync:  %8.0f ns\n", measureLatency(messageCount, syncProcessor.process))
    printf("Async: %8.0f ns\n", measureLatency(messageCount, asyncProcessor.process))
    printf("Akka:  %8.0f ns\n", measureLatency(messageCount, akkaProcessor.process))

    syncProcessor.shutdown()
    asyncProcessor.shutdown()
    akkaProcessor.shutdown()
  }
}
{noformat}
                
> More-efficient ExecutorService for improved throughput
> ------------------------------------------------------
>
>                 Key: CASSANDRA-4718
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-4718
>             Project: Cassandra
>          Issue Type: Improvement
>            Reporter: Jonathan Ellis
>            Priority: Minor
>         Attachments: baq vs trunk.png, PerThreadQueue.java
>
>
> Currently all our execution stages dequeue tasks one at a time.  This can result in contention
between producers and consumers (although we do our best to minimize this by using LinkedBlockingQueue).
> One approach to mitigating this would be to make consumer threads do more work in "bulk"
instead of just one task per dequeue.  (Producer threads tend to be single-task oriented by
nature, so I don't see an equivalent opportunity there.)
> BlockingQueue has a drainTo(collection, int) method that would be perfect for this. 
However, no ExecutorService in the jdk supports using drainTo, nor could I google one.
> What I would like to do here is create just such a beast and wire it into (at least)
the write and read stages.  (Other possible candidates for such an optimization, such as the
CommitLog and OutboundTCPConnection, are not ExecutorService-based and will need to be one-offs.)
> AbstractExecutorService may be useful.  The implementations of ICommitLogExecutorService
may also be useful. (Despite the name these are not actual ExecutorServices, although they
share the most important properties of one.)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message