cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Piotr Kołaczkowski (JIRA) <j...@apache.org>
Subject [jira] [Commented] (CASSANDRA-4718) More-efficient ExecutorService for improved throughput
Date Thu, 18 Apr 2013 18:58:15 GMT

    [ https://issues.apache.org/jira/browse/CASSANDRA-4718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13635532#comment-13635532
] 

Piotr Kołaczkowski commented on CASSANDRA-4718:
-----------------------------------------------

I made another version of benchmark, according to Sergio's suggestions. Now it uses the following
message processing graph:


{noformat}                 
   /------ stage 0 processor 0  ----\         /----           ----\                  /---
          ---\
   +------ stage 0 processor 1  ----+         +----           ----+                  +---
          ---+
>--+------ stage 0 processor 2  ----+---->----+----  STAGE 1  ----+------>- ... --->-+---
 STAGE m  ---+----->
   +------ ...                  ----+         +----           ----+                  +---
          ---+
   \------ stage 0 processor n  ----/         \----           ----/                  \---
          ---/
{noformat}

128 threads are concurrently trying to get messages through all the stages and measure average
latency, including the time required for the message to enter stage 0.
Thread-pool stages are built from fixed size thread pools with n=8, because there are 8 cores.
Actor-based stages are build from 128 actors each with a RoundRobinRouter in front of every
stage.

Average latencies:
{noformat}

{noformat}3 stages: 
Sync:    364687 ns
Async:   210766 ns
Akka:    201842 ns

4 stages: 
Sync:    492581 ns
Async:   221118 ns
Akka:    239407 ns

5 stages: 
Sync:    671733 ns
Async:   245370 ns
Akka:    283798 ns

6 stages: 
Sync:    781759 ns
Async:   262742 ns
Akka:    309384 ns
{noformat}

So Akka comes slightly slower than async thread pools.

If someone wants to play with my code, here is the up-to-date version:
{noformat}
import java.util.concurrent.{CountDownLatch, Executors}
import akka.actor.{Props, ActorSystem, Actor, ActorRef}
import akka.routing.{SmallestMailboxRouter, RoundRobinRouter}


class Message {
  var counter = 0
  val latch = new CountDownLatch(1)
}

abstract class MultistageThreadPoolProcessor(stageCount: Int) {

  val stages =
    for (i <- 1 to stageCount) yield Executors.newFixedThreadPool(8)

  def shutdown() {
    stages.foreach(_.shutdown())
  }

}

/** Synchronously processes a message through the stages.
  * The message is passed stage-to-stage by the coordinator thread. */
class SyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount)
{

  def process() {

    val message = new Message

    val task = new Runnable() {
      def run() { message.counter += 1 }
    }

    for (executor <- stages)
      executor.submit(task).get()
  }
}

/** Asynchronously processes a message through the stages.
  * Every stage after finishing its processing of the message
  * passes the message directly to the next stage, without bothering the coordinator thread.
*/
class AsyncThreadPoolProcessor(stageCount: Int) extends MultistageThreadPoolProcessor(stageCount)
{

  def process() {

    val message = new Message

    val task = new Runnable() {
      def run() {
        message.counter += 1
        if (message.counter >= stages.size)
          message.latch.countDown()
        else
          stages(message.counter).submit(this)
      }
    }

    stages(0).submit(task)
    message.latch.await()
  }
}

/** Similar to AsyncThreadPoolProcessor but it uses Akka actor system instead of thread pools
and queues.
  * Every stage after finishing its processing of the message
  * passes the message directly to the next stage, without bothering the coordinator thread.
*/
class AkkaProcessor(stageCount: Int) {

  val system = ActorSystem()

  val stages: IndexedSeq[ActorRef] = {
    for (i <- 1 to stageCount) yield
      system.actorOf(Props(createActor()).withRouter(RoundRobinRouter(nrOfInstances = 128)))
  }

  def createActor(): Actor = {
    new Actor {

      def receive = {
        case m: Message =>
          m.counter += 1
          if (m.counter >= stages.size)
            m.latch.countDown()
          else
            stages(m.counter) ! m
      }
    }
  }

  def process() {
    val message = new Message
    stages(0) ! message
    message.latch.await()
  }

  def shutdown() {
    system.shutdown()
  }

}



object MessagingBenchmark extends App {

  def measureLatency(count: Int, f: () => Any): Double = {
    val start = System.nanoTime()
    for (i <- 1 to count)
      f()
    val end = System.nanoTime()
    (end - start).toDouble / count
  }

  def measureLatency(threadCount: Int, messageCount: Int, f: () => Any): Double = {

    class RequestThread extends Thread {
      var latency: Double = 0.0
      override def run() { latency = measureLatency(messageCount, f) }
    }

    val threads =
      for (i <- 1 to threadCount) yield new RequestThread()

    threads.foreach(_.start())
    threads.foreach(_.join())

    threads.map(_.latency).sum / threads.size
  }


  val messageCount = 50000
  for (stageCount <- List(3,3,4,5,6,7,8,16,32))
  {
    printf("\n%d stages: \n", stageCount)
    val syncProcessor = new SyncThreadPoolProcessor(stageCount)
    val asyncProcessor = new AsyncThreadPoolProcessor(stageCount)
    val akkaProcessor = new AkkaProcessor(stageCount)

    printf("Sync:  %8.0f ns\n", measureLatency(128, messageCount, syncProcessor.process))
    printf("Async: %8.0f ns\n", measureLatency(128, messageCount, asyncProcessor.process))
    printf("Akka:  %8.0f ns\n", measureLatency(128, messageCount, akkaProcessor.process))

    syncProcessor.shutdown()
    asyncProcessor.shutdown()
    akkaProcessor.shutdown()
  }
}

{noformat}
                
> More-efficient ExecutorService for improved throughput
> ------------------------------------------------------
>
>                 Key: CASSANDRA-4718
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-4718
>             Project: Cassandra
>          Issue Type: Improvement
>            Reporter: Jonathan Ellis
>            Priority: Minor
>         Attachments: baq vs trunk.png, PerThreadQueue.java
>
>
> Currently all our execution stages dequeue tasks one at a time.  This can result in contention
between producers and consumers (although we do our best to minimize this by using LinkedBlockingQueue).
> One approach to mitigating this would be to make consumer threads do more work in "bulk"
instead of just one task per dequeue.  (Producer threads tend to be single-task oriented by
nature, so I don't see an equivalent opportunity there.)
> BlockingQueue has a drainTo(collection, int) method that would be perfect for this. 
However, no ExecutorService in the jdk supports using drainTo, nor could I google one.
> What I would like to do here is create just such a beast and wire it into (at least)
the write and read stages.  (Other possible candidates for such an optimization, such as the
CommitLog and OutboundTCPConnection, are not ExecutorService-based and will need to be one-offs.)
> AbstractExecutorService may be useful.  The implementations of ICommitLogExecutorService
may also be useful. (Despite the name these are not actual ExecutorServices, although they
share the most important properties of one.)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message