flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: How to count number of records received per second in processing time while using event time characteristic
Date Wed, 29 Jun 2016 15:23:41 GMT
Hi,
you can explicitly specify that you want processing-time windows like this:

stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(1))).sum(...)

Also note that the timestamp you append in
"writeAsCsv("records-per-second-" + System.currentTimeMillis())" will only
take the timestamp at the time when this function is called, this will only
happen once when your program is started.

Best,
Aljoscha

On Tue, 28 Jun 2016 at 17:33 Saiph Kappa <saiph.kappa@gmail.com> wrote:

> Hi,
>
> I have a flink streaming application and I want to count records received
> per second (as a way of measuring the throughput of my application).
> However, I am using the EventTime time characteristic, as follows:
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> val s = env.socketTextStream("localhost", 1234)
>
> s.map(line => Tuple1(1)).keyBy(0).timeWindow(Time.seconds(1)).sum(0).writeAsCsv("records-per-second-"
+
>   System.currentTimeMillis())
>
> val mainStrean = s.map(line => {
>   val Array(p1, p2) = line.split(" ")
>   (p1, p2.toInt)
> })
>   .assignAscendingTimestamps(p => System.currentTimeMillis())
>
> which naturally gives me this error:
>
> [error] Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
> timestamp (= no timestamp marker). Is the time characteristic set to
> 'ProcessingTime', or did you forget to call
> 'DataStream.assignTimestampsAndWatermarks(...)'?
>
> How can I do this?
>
>
> Thanks.
>

Mime
View raw message