flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: Flink streaming throughput
Date Fri, 11 Mar 2016 14:25:30 GMT
Hi Hironori,

can you try with the kafka-console-consumer how many messages you can read
in one minute?
Maybe the broker's disk I/O is limited because everything is running in
virtual machines (potentially sharing one hard disk?)
I'm also not sure if running a Kafka 0.8 consumer against a 0.9 broker is
working as expected.

Our Kafka 0.8 consumer has been tested in environments where its reading
with more than 100 MB/s per from a broker.


On Fri, Mar 11, 2016 at 9:33 AM, おぎばやしひろのり <ogibayashi@gmail.com>
wrote:

> Aljoscha,
>
> Thank you for your response.
>
> I tried no JSON parsing and no sink (DiscardingSink) case. The
> throughput was 8228msg/sec.
> Slightly better than JSON + Elasticsearch case.
> I also tried using socketTextStream instead of FlinkKafkaConsumer, in
> that case, the result was
> 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket
> server was the bottleneck)
> That was amazing, although Flink's fault tolerance feature is not
> available with socketTextStream.
>
> Regards,
> Hironori
>
> 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <aljoscha@apache.org>:
> > Hi,
> > Another interesting test would be a combination of 3) and 2). I.e. no
> JSON parsing and no sink. This would show what the raw throughput can be
> before being slowed down by writing to Elasticsearch.
> >
> > Also .print() is also not feasible for production since it just prints
> every element to the stdout log on the TaskManagers, which itself can cause
> quite a slowdown. You could try:
> >
> > datastream.addSink(new DiscardingSink())
> >
> > which is a dummy sink that does nothing.
> >
> > Cheers,
> > Aljoscha
> >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <ogibayashi@gmail.com>
wrote:
> >>
> >> Stephan,
> >>
> >> Sorry for the delay in my response.
> >> I tried 3 cases you suggested.
> >>
> >> This time, I set parallelism to 1 for simpicity.
> >>
> >> 0) base performance (same as the first e-mail): 1,480msg/sec
> >> 1) Disable checkpointing : almost same as 0)
> >> 2) No ES sink. just print() : 1,510msg/sec
> >> 3) JSON to TSV : 8,000msg/sec
> >>
> >> So, as you can see, the bottleneck was JSON parsing. I also want to
> >> try eliminating Kafka to see
> >> if there is a room to improve performance.(Currently, I am using
> >> FlinkKafkaConsumer082 with Kafka 0.9
> >> I think I should try Flink 1.0 and FlinkKafkaConsumer09).
> >> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
> >> Flink's scalability and fault tolerance.
> >> Thank you for your advice.
> >>
> >> Regards,
> >> Hironori Ogibayashi
> >>
> >> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibayashi@gmail.com>:
> >>> Stephan,
> >>>
> >>> Thank you for your quick response.
> >>> I will try and post the result later.
> >>>
> >>> Regards,
> >>> Hironori
> >>>
> >>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <sewen@apache.org>:
> >>>> Hi!
> >>>>
> >>>> I would try and dig bit by bit into what the bottleneck is:
> >>>>
> >>>> 1) Disable the checkpointing, see what difference that makes
> >>>> 2) Use a dummy sink (discarding) rather than elastic search, to see
> if that
> >>>> is limiting
> >>>> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive
> and
> >>>> easily dominate the entire pipeline.
> >>>>
> >>>> Greetings,
> >>>> Stephan
> >>>>
> >>>>
> >>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibayashi@gmail.com>
> wrote:
> >>>>>
> >>>>> Hello,
> >>>>>
> >>>>> I started evaluating Flink and tried simple performance test.
> >>>>> The result was just about 4000 messages/sec with 300% CPU usage.
I
> >>>>> think this is quite low and wondering if it is a reasonable result.
> >>>>> If someone could check it, it would be great.
> >>>>>
> >>>>> Here is the detail:
> >>>>>
> >>>>> [servers]
> >>>>> - 3 Kafka broker with 3 partitions
> >>>>> - 3 Flink TaskManager + 1 JobManager
> >>>>> - 1 Elasticsearch
> >>>>> All of them are separate VM with 8vCPU, 8GB memory
> >>>>>
> >>>>> [test case]
> >>>>> The application counts access log by URI with in 1 minute window
and
> >>>>> send the result to Elasticsearch. The actual code is below.
> >>>>> I used '-p 3' option to flink run command, so the task was
> distributed
> >>>>> to 3 TaskManagers.
> >>>>> In the test, I sent about 5000 logs/sec to Kafka.
> >>>>>
> >>>>> [result]
> >>>>> - From Elasticsearch records, the total access count for all URI
was
> >>>>> about 260,000/min = 4300/sec. This is the entire throughput.
> >>>>> - Kafka consumer lag was keep growing.
> >>>>> - The CPU usage of each TaskManager machine was about 13-14%. From
> top
> >>>>> command output, Flink java process was using 100%(1 CPU full)
> >>>>>
> >>>>> So I thought the bottleneck here was CPU used by Flink Tasks.
> >>>>>
> >>>>> Here is the application code.
> >>>>> ---
> >>>>>    val env = StreamExecutionEnvironment.getExecutionEnvironment
> >>>>>    env.enableCheckpointing(1000)
> >>>>> ...
> >>>>>    val stream = env
> >>>>>      .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy",
new
> >>>>> SimpleStringSchema(), properties))
> >>>>>      .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
> >>>>> AnyRef]] }
> >>>>>      .map{ x => x.get("uri") match {
> >>>>>        case Some(y) => (y.asInstanceOf[String],1)
> >>>>>        case None => ("", 1)
> >>>>>      }}
> >>>>>      .keyBy(0)
> >>>>>      .timeWindow(Time.of(1, TimeUnit.MINUTES))
> >>>>>      .sum(1)
> >>>>>      .map{ x => (System.currentTimeMillis(), x)}
> >>>>>      .addSink(new ElasticsearchSink(config, transports, new
> >>>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
> >>>>>        override def createIndexRequest(element: Tuple2[Long,
> >>>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
> >>>>>          val json = new HashMap[String, AnyRef]
> >>>>>          json.put("@timestamp", new Timestamp(element._1))
> >>>>>          json.put("uri", element._2._1)
> >>>>>          json.put("count", element._2._2: java.lang.Integer)
> >>>>>          println("SENDING: " + element)
> >>>>>
> >>>>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
> >>>>>        }
> >>>>>      }))
> >>>>> ---
> >>>>>
> >>>>> Regards,
> >>>>> Hironori Ogibayashi
> >>>>
> >>>>
> >
>

Mime
View raw message