flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nirmalya Sengupta <sengupta.nirma...@gmail.com>
Subject Continuing from the stackoverflow post
Date Fri, 27 Nov 2015 13:49:34 GMT
Hello Fabian/Matthius,

Many thanks for showing interest in my query on SOF. That helps me sustain
my enthusiasm. :-)

After setting parallelism of environment to '1' and replacing _max()_ with
_maxBy()_, I get a list of maximum temperatures but I fail to explain to
myself, how does Flink arrive at those figures (attached below). I
understand that different runs will possibly generate different results,
because I am using **ProcessingTime** characteristic. Yet, I expect some
kind of a deterministic output which I don't see.

Please prod me to the right direction.

Here's the code I have been referring to:

-------------------------------------------------

case class IncomingDataUnit (
                              sensorUUID: String, radiationLevel:
Int,photoSensor: Float,
                              humidity: Float,timeStamp: Long,
ambientTemperature: Float)
  extends Serializable { }


object SocketTextStreamWordCount {

  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.setParallelism(1)

    val readings =
      readIncomingReadings(env,"./sampleIOTTiny.csv")
      .map(e => (e.sensorUUID,e.ambientTemperature))
      .timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS))
      .trigger(CountTrigger.of(5))
      .evictor(CountEvictor.of(4))
      .maxBy(1)

    readings.print

    env.execute("Scala IOT Stream  experiment Example")

  }

  private def readIncomingReadings(env:
StreamExecutionEnvironment,inputPath: String) :
DataStream[IncomingDataUnit] = {
    env.readTextFile(inputPath).map(datum => {
      val fields = datum.split(",")
      IncomingDataUnit(
        fields(0),              // sensorUUID
        fields(1).toInt,        // radiationLevel
        fields(2).toFloat,      // photoSensor
        fields(3).toFloat,      // humidity
        fields(4).toLong,       // timeStamp
        fields(5).toFloat       // ambientTemperature
      )
    })
  }

 }

-------------------------------------------------

Here's the dataset:
------------------------------------------------

probe-f076c2b0,201,842.53,75.5372,1448028160,29.37
probe-dccefede,199,749.25,78.6057,1448028160,27.46
probe-f29f9662,199,821.81,81.7831,1448028160,22.35
probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98
probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02
probe-4d78b545,204,778.42,78.412,1448028160,25.92
probe-400c5cdf,204,711.65,73.585,1448028160,22.18
probe-df2d4cad,199,820.8,72.936,1448028161,16.18
probe-f4ef109e,199,785.68,77.5647,1448028161,16.36
probe-3fac3350,200,720.12,78.2073,1448028161,19.19
probe-42a9ddca,193,819.12,74.3712,1448028161,22.07
probe-252a5bbd,197,710.32,80.6072,1448028161,14.64
probe-987f2cb6,200,750.4,76.0533,1448028161,14.72
probe-24444323,197,816.06,84.0816,1448028161,4.405
probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43
probe-20c609fb,204,804.37,84.5243,1448028161,22.87
probe-c027fdc9,195,858.61,81.7682,1448028161,24.47
probe-2c6cd3de,198,826.96,85.26,1448028162,18.99
probe-960906ca,197,797.63,77.4359,1448028162,27.62
-------------------------------------------------

And here's the output:

---------------------

(probe-6c75cfbe,30.02)
(probe-42a9ddca,22.07)
(probe-960906ca,27.62)
(probe-400c5cdf,22.18)
(probe-f076c2b0,29.37)
(probe-6c75cfbe,30.02)
(probe-960906ca,27.62)

---------------------


-- Nirmalya

-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."

Mime
View raw message