gearpump-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From huafengw <...@git.apache.org>
Subject [GitHub] incubator-gearpump pull request #186: [GEARPUMP-316] Decouple groupBy from w...
Date Mon, 12 Jun 2017 15:36:55 GMT
Github user huafengw commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/186#discussion_r121446246
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
---
    @@ -19,133 +19,121 @@ package org.apache.gearpump.streaming.dsl.window.impl
     
     import java.time.Instant
     
    -import akka.actor.ActorSystem
     import com.gs.collections.api.block.predicate.Predicate
    -import org.apache.gearpump.Message
    -import org.apache.gearpump.cluster.UserConfig
    -import com.gs.collections.api.block.procedure.{Procedure, Procedure2}
    +import com.gs.collections.api.block.procedure.Procedure
     import com.gs.collections.impl.list.mutable.FastList
    -import com.gs.collections.impl.map.mutable.UnifiedMap
     import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
    -import org.apache.gearpump.streaming.Constants._
     import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
    -import org.apache.gearpump.streaming.dsl.window.api.Discarding
    -import org.apache.gearpump.streaming.task.TaskContext
    -import org.apache.gearpump.util.LogUtil
    -import org.slf4j.Logger
    +import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context
    +import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows}
     
    +import scala.collection.mutable.ArrayBuffer
     
    -trait WindowRunner {
    +trait WindowRunner[IN, OUT] extends java.io.Serializable {
     
    -  def process(message: Message): Unit
    +  def process(in: IN, time: Instant): Unit
     
    -  def trigger(time: Instant): Unit
    +  def trigger(time: Instant): TraversableOnce[(OUT, Instant)]
     }
     
    -object DefaultWindowRunner {
    +case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
    +    right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
     
    -  private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
    +  def process(in: IN, time: Instant): Unit = {
    +    left.process(in, time)
    +  }
    +
    +  def trigger(time: Instant): TraversableOnce[(OUT, Instant)] = {
    +    left.trigger(time).foreach(result => right.process(result._1, result._2))
    +    right.trigger(time)
    +  }
     }
     
    -class DefaultWindowRunner[IN, GROUP, OUT](
    -    taskContext: TaskContext, userConfig: UserConfig,
    -    groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
    -  extends WindowRunner {
    -
    -  private val windowFn = groupBy.window.windowFn
    -  private val groupedWindowInputs = new UnifiedMap[GROUP, TreeSortedMap[Window, FastList[IN]]]
    -  private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]]
    -  private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean]
    -
    -  override def process(message: Message): Unit = {
    -    val input = message.value.asInstanceOf[IN]
    -    val (group, windows) = groupBy.groupBy(message)
    -    if (!groupedWindowInputs.containsKey(group)) {
    -      groupedWindowInputs.put(group, new TreeSortedMap[Window, FastList[IN]]())
    -    }
    -    val windowInputs = groupedWindowInputs.get(group)
    -    windows.foreach { win =>
    +class DefaultWindowRunner[IN, OUT](
    +    windows: Windows,
    +    fnRunner: FunctionRunner[IN, OUT])
    +  extends WindowRunner[IN, OUT] {
    +
    +  private val windowFn = windows.windowFn
    +  private val windowInputs = new TreeSortedMap[Window, FastList[(IN, Instant)]]
    +  private var setup = false
    +
    +  override def process(in: IN, time: Instant): Unit = {
    +    val wins = windowFn(new Context[IN] {
    +      override def element: IN = in
    +
    +      override def timestamp: Instant = time
    +    })
    +    wins.foreach { win =>
           if (windowFn.isNonMerging) {
             if (!windowInputs.containsKey(win)) {
    -          val inputs = new FastList[IN](1)
    +          val inputs = new FastList[(IN, Instant)]
               windowInputs.put(win, inputs)
             }
    -        windowInputs.get(win).add(input)
    +        windowInputs.get(win).add(in -> time)
           } else {
    -        merge(windowInputs, win, input)
    +        merge(windowInputs, win, in, time)
           }
         }
     
    -    if (!groupedFnRunners.containsKey(group)) {
    -      val runner = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
    -      groupedFnRunners.put(group, runner)
    -      groupedRunnerSetups.put(group, false)
    -    }
    -
    -    def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win: Window, input:
IN): Unit = {
    -      val intersected = windowInputs.keySet.select(new Predicate[Window] {
    +    def merge(
    +        winIns: TreeSortedMap[Window, FastList[(IN, Instant)]],
    +        win: Window, in: IN, time: Instant): Unit = {
    +      val intersected = winIns.keySet.select(new Predicate[Window] {
             override def accept(each: Window): Boolean = {
               win.intersects(each)
             }
           })
           var mergedWin = win
    -      val mergedInputs = FastList.newListWith(input)
    +      val mergedInputs = FastList.newListWith(in -> time)
           intersected.forEach(new Procedure[Window] {
             override def value(each: Window): Unit = {
               mergedWin = mergedWin.span(each)
    -          mergedInputs.addAll(windowInputs.remove(each))
    +          mergedInputs.addAll(winIns.remove(each))
             }
           })
    -      windowInputs.put(mergedWin, mergedInputs)
    +      winIns.put(mergedWin, mergedInputs)
         }
    -
       }
     
    -  override def trigger(time: Instant): Unit = {
    -    groupedWindowInputs.forEachKeyValue(new Procedure2[GROUP, TreeSortedMap[Window, FastList[IN]]]
{
    -      override def value(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]):
Unit = {
    -        onTrigger(group, windowInputs)
    -      }
    -    })
    -
    +  override def trigger(time: Instant): TraversableOnce[(OUT, Instant)] = {
         @annotation.tailrec
    -    def onTrigger(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit
= {
    +    def onTrigger(outputs: ArrayBuffer[(OUT, Instant)]): TraversableOnce[(OUT, Instant)]
= {
           if (windowInputs.notEmpty()) {
             val firstWin = windowInputs.firstKey
             if (!time.isBefore(firstWin.endTime)) {
               val inputs = windowInputs.remove(firstWin)
    -          if (groupedFnRunners.containsKey(group)) {
    -            val runner = FunctionRunner.withEmitFn(groupedFnRunners.get(group),
    -              (output: OUT) => {
    -                taskContext.output(Message(output, time))
    -              })
    -            val setup = groupedRunnerSetups.get(group)
    -            if (!setup) {
    -              runner.setup()
    -              groupedRunnerSetups.put(group, true)
    -            }
    -            inputs.forEach(new Procedure[IN] {
    -              override def value(t: IN): Unit = {
    -                // .toList forces eager evaluation
    -                runner.process(t).toList
    +          if (!setup) {
    +            fnRunner.setup()
    +            setup = true
    +          }
    +          inputs.forEach(new Procedure[(IN, Instant)] {
    +            override def value(v: (IN, Instant)): Unit = {
    +              fnRunner.process(v._1).foreach {
    +                out: OUT => outputs += (out -> v._2)
                   }
    -            })
    -            // .toList forces eager evaluation
    -            runner.finish().toList
    -            if (groupBy.window.accumulationMode == Discarding) {
    -              runner.teardown()
    -              groupedRunnerSetups.put(group, false)
    -              // dicarding, setup need to be called for each window
    -              onTrigger(group, windowInputs)
    -            } else {
    -              // accumulating, setup is only called for the first window
    -              onTrigger(group, windowInputs)
                 }
    +          })
    +          fnRunner.finish().foreach {
    +            out: OUT => outputs += (out -> firstWin.endTime.minusMillis(1))
    --- End diff --
    
    I'm wondering, does the message time order in `outputs` matter? If so, is it guaranteed?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message