gearpump-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (GEARPUMP-316) Don't enforce groupBy after window
Date Mon, 12 Jun 2017 15:37:00 GMT

    [ https://issues.apache.org/jira/browse/GEARPUMP-316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16046692#comment-16046692
] 

ASF GitHub Bot commented on GEARPUMP-316:
-----------------------------------------

Github user huafengw commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/186#discussion_r121446246
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
---
    @@ -19,133 +19,121 @@ package org.apache.gearpump.streaming.dsl.window.impl
     
     import java.time.Instant
     
    -import akka.actor.ActorSystem
     import com.gs.collections.api.block.predicate.Predicate
    -import org.apache.gearpump.Message
    -import org.apache.gearpump.cluster.UserConfig
    -import com.gs.collections.api.block.procedure.{Procedure, Procedure2}
    +import com.gs.collections.api.block.procedure.Procedure
     import com.gs.collections.impl.list.mutable.FastList
    -import com.gs.collections.impl.map.mutable.UnifiedMap
     import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
    -import org.apache.gearpump.streaming.Constants._
     import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
    -import org.apache.gearpump.streaming.dsl.window.api.Discarding
    -import org.apache.gearpump.streaming.task.TaskContext
    -import org.apache.gearpump.util.LogUtil
    -import org.slf4j.Logger
    +import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context
    +import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows}
     
    +import scala.collection.mutable.ArrayBuffer
     
    -trait WindowRunner {
    +trait WindowRunner[IN, OUT] extends java.io.Serializable {
     
    -  def process(message: Message): Unit
    +  def process(in: IN, time: Instant): Unit
     
    -  def trigger(time: Instant): Unit
    +  def trigger(time: Instant): TraversableOnce[(OUT, Instant)]
     }
     
    -object DefaultWindowRunner {
    +case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
    +    right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
     
    -  private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
    +  def process(in: IN, time: Instant): Unit = {
    +    left.process(in, time)
    +  }
    +
    +  def trigger(time: Instant): TraversableOnce[(OUT, Instant)] = {
    +    left.trigger(time).foreach(result => right.process(result._1, result._2))
    +    right.trigger(time)
    +  }
     }
     
    -class DefaultWindowRunner[IN, GROUP, OUT](
    -    taskContext: TaskContext, userConfig: UserConfig,
    -    groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
    -  extends WindowRunner {
    -
    -  private val windowFn = groupBy.window.windowFn
    -  private val groupedWindowInputs = new UnifiedMap[GROUP, TreeSortedMap[Window, FastList[IN]]]
    -  private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]]
    -  private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean]
    -
    -  override def process(message: Message): Unit = {
    -    val input = message.value.asInstanceOf[IN]
    -    val (group, windows) = groupBy.groupBy(message)
    -    if (!groupedWindowInputs.containsKey(group)) {
    -      groupedWindowInputs.put(group, new TreeSortedMap[Window, FastList[IN]]())
    -    }
    -    val windowInputs = groupedWindowInputs.get(group)
    -    windows.foreach { win =>
    +class DefaultWindowRunner[IN, OUT](
    +    windows: Windows,
    +    fnRunner: FunctionRunner[IN, OUT])
    +  extends WindowRunner[IN, OUT] {
    +
    +  private val windowFn = windows.windowFn
    +  private val windowInputs = new TreeSortedMap[Window, FastList[(IN, Instant)]]
    +  private var setup = false
    +
    +  override def process(in: IN, time: Instant): Unit = {
    +    val wins = windowFn(new Context[IN] {
    +      override def element: IN = in
    +
    +      override def timestamp: Instant = time
    +    })
    +    wins.foreach { win =>
           if (windowFn.isNonMerging) {
             if (!windowInputs.containsKey(win)) {
    -          val inputs = new FastList[IN](1)
    +          val inputs = new FastList[(IN, Instant)]
               windowInputs.put(win, inputs)
             }
    -        windowInputs.get(win).add(input)
    +        windowInputs.get(win).add(in -> time)
           } else {
    -        merge(windowInputs, win, input)
    +        merge(windowInputs, win, in, time)
           }
         }
     
    -    if (!groupedFnRunners.containsKey(group)) {
    -      val runner = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
    -      groupedFnRunners.put(group, runner)
    -      groupedRunnerSetups.put(group, false)
    -    }
    -
    -    def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win: Window, input:
IN): Unit = {
    -      val intersected = windowInputs.keySet.select(new Predicate[Window] {
    +    def merge(
    +        winIns: TreeSortedMap[Window, FastList[(IN, Instant)]],
    +        win: Window, in: IN, time: Instant): Unit = {
    +      val intersected = winIns.keySet.select(new Predicate[Window] {
             override def accept(each: Window): Boolean = {
               win.intersects(each)
             }
           })
           var mergedWin = win
    -      val mergedInputs = FastList.newListWith(input)
    +      val mergedInputs = FastList.newListWith(in -> time)
           intersected.forEach(new Procedure[Window] {
             override def value(each: Window): Unit = {
               mergedWin = mergedWin.span(each)
    -          mergedInputs.addAll(windowInputs.remove(each))
    +          mergedInputs.addAll(winIns.remove(each))
             }
           })
    -      windowInputs.put(mergedWin, mergedInputs)
    +      winIns.put(mergedWin, mergedInputs)
         }
    -
       }
     
    -  override def trigger(time: Instant): Unit = {
    -    groupedWindowInputs.forEachKeyValue(new Procedure2[GROUP, TreeSortedMap[Window, FastList[IN]]]
{
    -      override def value(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]):
Unit = {
    -        onTrigger(group, windowInputs)
    -      }
    -    })
    -
    +  override def trigger(time: Instant): TraversableOnce[(OUT, Instant)] = {
         @annotation.tailrec
    -    def onTrigger(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit
= {
    +    def onTrigger(outputs: ArrayBuffer[(OUT, Instant)]): TraversableOnce[(OUT, Instant)]
= {
           if (windowInputs.notEmpty()) {
             val firstWin = windowInputs.firstKey
             if (!time.isBefore(firstWin.endTime)) {
               val inputs = windowInputs.remove(firstWin)
    -          if (groupedFnRunners.containsKey(group)) {
    -            val runner = FunctionRunner.withEmitFn(groupedFnRunners.get(group),
    -              (output: OUT) => {
    -                taskContext.output(Message(output, time))
    -              })
    -            val setup = groupedRunnerSetups.get(group)
    -            if (!setup) {
    -              runner.setup()
    -              groupedRunnerSetups.put(group, true)
    -            }
    -            inputs.forEach(new Procedure[IN] {
    -              override def value(t: IN): Unit = {
    -                // .toList forces eager evaluation
    -                runner.process(t).toList
    +          if (!setup) {
    +            fnRunner.setup()
    +            setup = true
    +          }
    +          inputs.forEach(new Procedure[(IN, Instant)] {
    +            override def value(v: (IN, Instant)): Unit = {
    +              fnRunner.process(v._1).foreach {
    +                out: OUT => outputs += (out -> v._2)
                   }
    -            })
    -            // .toList forces eager evaluation
    -            runner.finish().toList
    -            if (groupBy.window.accumulationMode == Discarding) {
    -              runner.teardown()
    -              groupedRunnerSetups.put(group, false)
    -              // dicarding, setup need to be called for each window
    -              onTrigger(group, windowInputs)
    -            } else {
    -              // accumulating, setup is only called for the first window
    -              onTrigger(group, windowInputs)
                 }
    +          })
    +          fnRunner.finish().foreach {
    +            out: OUT => outputs += (out -> firstWin.endTime.minusMillis(1))
    --- End diff --
    
    I'm wondering, does the message time order in `outputs` matter? If so, is it guaranteed?



> Don't enforce groupBy after window
> ----------------------------------
>
>                 Key: GEARPUMP-316
>                 URL: https://issues.apache.org/jira/browse/GEARPUMP-316
>             Project: Apache Gearpump
>          Issue Type: Sub-task
>          Components: streaming
>            Reporter: Manu Zhang
>            Assignee: Manu Zhang
>
> Return a normal Stream instead of WindowStream on window function. Window function defines
a boundary (window) for elements and the following operations should fall in corresponding
boundaries. The boundary should not change until a new window function is defined. The default
boundary is the {{GlobalWindows}} if not defined.
> This means there will be a window context for each underlying task. Elements are emitted
in terms of the trigger semantics. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message