flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Seth Wiesman (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7999) Variable Join Window Boundaries
Date Fri, 10 Nov 2017 15:46:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16247674#comment-16247674
] 

Seth Wiesman commented on FLINK-7999:
-------------------------------------

Sure, I will use the example of my use case. 

I am using Flink to aggregate spend about campaigns that I am running. Each record in the
main stream represents contains both a campaign id as well as information about a single financial
transaction, this stream is analogous to the fact table in a traditional data warehouse. However,
each campaign runs under a different currency so I need to join with metadata containing the
currency code for that campaign. Campaigns have both start and end dates, which can span any
amount of time from one day to several months. 

Events:                                              Metadata: 
timestamp             | id | spend           id | start_date   | end_date     | currency_code
-----------------------------------------         -------------------------------------------------------
        
2017-11-11 03:00 | 1  | 0.25               1 | 2017-11-10 | 2017-11-12 |   "USD"
2017-11-11 03:02 | 2  | 0.03               2 | 2017-04-02 | 2019-12-31 |   "EUR"
2017-11-11 03:05 | 1  | 0.11

I have a valid window of each event can be joined, but it varies by id, today implement this
join with the following `CoProcessFunction`. 

{code:java}
class CampaignJoin(allowedLateness: Long) extends CoProcessFunction[Event, CampaignMetadata,
Event] {

  @transient private lazy val descriptor = new ValueStateDescriptor[CampaignMetadata]("campaign",
createTypeInformation[CampaignMetadata])

  override def processElement1(value: Event, ctx: CoProcessFunction[Event, CampaignMetadata,
Event]#Context, out: Collector[Event]): Unit = {
    val campaign = getRuntimeContext.getState(descriptor).value()

    if (campaign != null && campaign.start <= ctx.timestamp()) {
      out.collect(value.copy(meta = campaign))
    }
  }

  override def processElement2(value: CampaignMetadata, ctx: CoProcessFunction[Event, CampaignMetadata,
Event]#Context, out: Collector[Event]): Unit = {
    val end = value.end.getTime + allowedLateness
    if (end < ctx.timerService().currentWatermark()) {
      return
    }

    ctx.timerService().registerEventTimeTimer(end)
    getRuntimeContext.getState(descriptor).update(value)
  }

  override def onTimer(timestamp: Long, ctx: CoProcessFunction[Record, CampaignMetadata, Record]#OnTimerContext,
out: Collector[Record]): Unit = {
    val state    = getRuntimeContext.getState(descriptor)
    val campaign = state.value()

    if (campaign != null) {
      val end = campaign.end.getTime + allowedLateness

      if (end == timestamp) {
        state.clear()
      }
    }
  }
}

{code}

> Variable Join Window Boundaries
> -------------------------------
>
>                 Key: FLINK-7999
>                 URL: https://issues.apache.org/jira/browse/FLINK-7999
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Seth Wiesman
>
> Allow window joins with variable length based on row attributes. 
> Consider a two streams joined on an id, where one has start and end dates, it would be
useful to be able to join each row during is live durations. Today this can be expressed in
the datastream api using a CoProcessFunction. 
>  left.id = right.id AND (left.time > right.start and left.time < right.end)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message