flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Schmidtke <ro.schmid...@gmail.com>
Subject Re: Measuring latency in a DataStream
Date Tue, 03 May 2016 07:54:53 GMT
Hi Igor, thanks for your reply.

As for your first point I'm not sure I understand correctly. I'm ingesting
records at a rate of about 50k records per second, and those records are
fairly small. If I add a time stamp to each of them, I will have a lot more
data, which is not exactly what I want. Instead I wanted to add something
like a watermark once every second and only have a time stamp on this one,
and calculate the latency from it.

For your second point, in fact the clocks are up to 8s apart -.-" not sure
how I missed this yesterday. as I'm not an admin of the machine I will
request ntp to be set up.


On Mon, May 2, 2016 at 10:19 PM, Igor Berman <igor.berman@gmail.com> wrote:

> 1. why are you doing join instead of something like
> System.currentTimeInMillis()? at the end you have tuple of your data with
> timestamp anyways...so why just not to wrap you data in tuple2 with
> additional info of creation ts?
> 2. are you sure that consumer/producer machines' clocks are in sync?
> you can use ntp for this.
> On 2 May 2016 at 20:02, Robert Schmidtke <ro.schmidtke@gmail.com> wrote:
>> Hi everyone,
>> I have implemented a way to measure latency in a DataStream (I hope): I'm
>> consuming a Kafka topic and I'm union'ing the resulting stream with a
>> custom source that emits a (machine-local) timestamp every 1000ms (using
>> currentTimeMillis). On the consuming end I'm distinguishing between the
>> Kafka events and the timestamps. When encountering a timestamp, I take the
>> difference of the processing machine's local time and the timestamp found
>> in the stream, expecting a positive difference (with the processing
>> machine's timestamp being larger than the timestamp found in the stream).
>> However, the opposite is the case. Now I am wondering about when events are
>> actually processed.
>> Union the Stream from Kafka+my custom source, batching them in 10s
>> windows (which is what I do), I expect 10 timestamps with ascending values
>> and a rough gap of 1000ms in the stream:
>> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68
>> On the receiving end I again take the currentTimeMillis in my fold
>> function, expecting the resulting value to be larger (most of the time)
>> than the timestamps encountered in the stream:
>> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/NumericCalcJob.scala#L53
>> The system clocks are in sync up to 1ms.
>> Maybe I am not clear about when certain timestamps are created (i.e. when
>> the UDFs are invoked) or how windows are processed. Any advice is greatly
>> appreciated, also alternative approaches to calculating latency.
>> I'm on Flink 0.10.2 by the way.
>> Thanks in advance for the help!
>> Robert
>> --
>> My GPG Key ID: 336E2680

My GPG Key ID: 336E2680

View raw message