flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Saiph Kappa <saiph.ka...@gmail.com>
Subject How to count number of records received per second in processing time while using event time characteristic
Date Tue, 28 Jun 2016 15:33:21 GMT
Hi,

I have a flink streaming application and I want to count records received
per second (as a way of measuring the throughput of my application).
However, I am using the EventTime time characteristic, as follows:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val s = env.socketTextStream("localhost", 1234)

s.map(line => Tuple1(1)).keyBy(0).timeWindow(Time.seconds(1)).sum(0).writeAsCsv("records-per-second-"
+
  System.currentTimeMillis())

val mainStrean = s.map(line => {
  val Array(p1, p2) = line.split(" ")
  (p1, p2.toInt)
})
  .assignAscendingTimestamps(p => System.currentTimeMillis())

which naturally gives me this error:

[error] Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
timestamp (= no timestamp marker). Is the time characteristic set to
'ProcessingTime', or did you forget to call
'DataStream.assignTimestampsAndWatermarks(...)'?

How can I do this?


Thanks.

Mime
View raw message