flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From おぎばやしひろのり <ogibaya...@gmail.com>
Subject Flink streaming throughput
Date Fri, 26 Feb 2016 10:23:10 GMT

I started evaluating Flink and tried simple performance test.
The result was just about 4000 messages/sec with 300% CPU usage. I
think this is quite low and wondering if it is a reasonable result.
If someone could check it, it would be great.

Here is the detail:

- 3 Kafka broker with 3 partitions
- 3 Flink TaskManager + 1 JobManager
- 1 Elasticsearch
All of them are separate VM with 8vCPU, 8GB memory

[test case]
The application counts access log by URI with in 1 minute window and
send the result to Elasticsearch. The actual code is below.
I used '-p 3' option to flink run command, so the task was distributed
to 3 TaskManagers.
In the test, I sent about 5000 logs/sec to Kafka.

- From Elasticsearch records, the total access count for all URI was
about 260,000/min = 4300/sec. This is the entire throughput.
- Kafka consumer lag was keep growing.
- The CPU usage of each TaskManager machine was about 13-14%. From top
command output, Flink java process was using 100%(1 CPU full)

So I thought the bottleneck here was CPU used by Flink Tasks.

Here is the application code.
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
      .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
SimpleStringSchema(), properties))
      .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String, AnyRef]] }
      .map{ x => x.get("uri") match {
        case Some(y) => (y.asInstanceOf[String],1)
        case None => ("", 1)
      .timeWindow(Time.of(1, TimeUnit.MINUTES))
      .map{ x => (System.currentTimeMillis(), x)}
      .addSink(new ElasticsearchSink(config, transports, new
IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
        override def createIndexRequest(element: Tuple2[Long,
Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
          val json = new HashMap[String, AnyRef]
          json.put("@timestamp", new Timestamp(element._1))
          json.put("uri", element._2._1)
          json.put("count", element._2._2: java.lang.Integer)
          println("SENDING: " + element)

Hironori Ogibayashi

View raw message