flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Renkai Ge (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-4428) Method map/flatMapWithState may need a eviction policy
Date Fri, 19 Aug 2016 06:06:23 GMT
Renkai Ge created FLINK-4428:
--------------------------------

             Summary: Method map/flatMapWithState may need a eviction policy
                 Key: FLINK-4428
                 URL: https://issues.apache.org/jira/browse/FLINK-4428
             Project: Flink
          Issue Type: New Feature
          Components: DataStream API
    Affects Versions: 1.1.2
            Reporter: Renkai Ge


I want to count the number of unique visitors of a website every day.
 If the number changes, I want get the newest number in 1 second, and
 it should keep silence if the number doesn't change.I implemented this 
 by time window of 1 day,trigger of 1 second and flatMapWithState to 
 filter duplicated results. 
{code}
     //    case class Visit(uuid: String, time: Long, platform: Int)
 
     //    case class WindowUv(platform: Int, uv: Long, windowStart: Long, WindowEnd: Long)
 
     //  val consumer: FlinkKafkaConsumer08[Visit]
     val stream =
     env.addSource(consumer)
       .keyBy(_.platform)
       .timeWindow(Time.days(1))
       .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
       .applyWith((0, Set.empty[Int], 0l, 0l))(
         foldFunction = {
           case ((_, set, _, 0), visit) =>
             (visit.platform, set + visit.uuid.hashCode, 0, 0)
         },
         windowFunction = {
           case (key, window, results) =>
             results.map {
               case (platform, set, _, _) =>
                 (platform, set, window.getStart, window.getEnd)
             }
         }
       )
       .mapWith {
         case (key, set, windowStart, windowEnd) =>
           WindowUv(key, set.size, windowStart, windowEnd)
       }
       .keyBy(uv => (uv.platform, uv.windowStart))
       .flatMapWithState[WindowUv, Int] {
       case ((key, num, begin, end), curr) =>
         curr match {
           case Some(numCurr) if numCurr == num =>
             (Seq.empty, Some(num))
           case _ =>
             (Seq(WindowUv(key, num, begin, end)), Some(num))
         }
     }
     stream.print()
     env.execute("Boom")
{code}

There is a problem that I used flatMapWithState,the state of one day will
be never updated and never used after the day passed, but it will stay
in the memory forever, there is no way to evict it. So I think the status
in map may need some eviction policy related with time or global conditions
rather than only with the last message of the key(It's hard to tell whether 
a message is the last when the last is coming).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message