flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Latency Measurement
Date Wed, 19 Jul 2017 10:00:13 GMT
I originally meant startNewChain(), but disableChaining() should work too.

Can you rerun the job with the logging level set to DEBUG, and check for 
any message from org.apache.flink.runtime.metrics?

Also looping in Robert, maybe he has an idea.

On 17.07.2017 14:23, Paolo Cristofanelli wrote:
> Hi Chesnay,
>
> thanks for your answer. I have not found the method createNewChain(), 
> I used instead disableChaining(), but with no effect:
>
>          DataStream<String> stream = env.addSource(
>
>         new FlinkKafkaConsumer08<>(
>
>         "MyTopic", new SimpleStringSchema(), properties) );
>
>
>            stream.map( new ConsumerMap()).disableChaining();
>
>
>            env.execute();
>
>
>
> Best Regards,
> Paolo
>
> On 17 July 2017 at 13:10, Chesnay Schepler <chesnay@apache.org 
> <mailto:chesnay@apache.org>> wrote:
>
>     Hello,
>
>     As for 1), my suspicion is that this is caused by chaining. If the
>     map function is chained to the kafka source then the latency
>     markers are always immediately forwarded, regardless of what your
>     map function is doing.
>     If the map function is indeed chained to the source, could you try
>     again after disabling the chain by calling
>     `X.map(...).createNewChain()` and report back?
>
>     As for 2), I don't think this is possible right now.
>
>     Regards,
>     Chesnay
>
>
>     On 17.07.2017 12:42, Paolo Cristofanelli wrote:
>
>         Hi,
>
>         I would like to understand how to measure the latency of a record.
>         I have set up a simple project with a Kafka consumer that
>         reads from a topic and performs a simple map (with a thread
>         sleep inside).
>
>         In order to measure the latency of this mapper I have added
>         env.getConfig().setLatencyTrackingInterval(10);
>
>         After that, I was planning to access the latency through the
>         webUI interface but the related graph does not show any values.
>         I do not understand why. I was thinking that I in the graph I
>         should observe at least the sleep duration.
>
>         I also have another question:
>
>         I am using a count window, aggregating every 100 input records
>         and then I perform a map. I want to see the latency as the
>         difference between the time at which the output record is
>         emitted and the arrival time of the earliest input record.
>
>         For example, the first value arrives at x. After x +5 I all
>         the 100 values arrived and the system can aggregate them. Now
>         I perform the map operation and we emit the output record at
>         time x+15.
>         I would like to obtain 15 as latency.
>         Do you have any suggestion on how to proceed?
>
>         Thanks for your time,
>         Paolo Cristofanelli
>
>
>
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message