gearpump-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (GEARPUMP-316) Don't enforce groupBy after window
Date Wed, 07 Jun 2017 03:17:18 GMT

    [ https://issues.apache.org/jira/browse/GEARPUMP-316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040082#comment-16040082
] 

ASF GitHub Bot commented on GEARPUMP-316:
-----------------------------------------

Github user huafengw commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/186#discussion_r120526614
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.gearpump.streaming.dsl.task
    +
    +import java.time.Instant
    +import java.util.function.Consumer
    +
    +import com.gs.collections.impl.map.mutable.UnifiedMap
    +import org.apache.gearpump.Message
    +import org.apache.gearpump.cluster.UserConfig
    +import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION,
GEARPUMP_STREAMING_OPERATOR}
    +import org.apache.gearpump.streaming.dsl.window.impl.WindowRunner
    +import org.apache.gearpump.streaming.task.{Task, TaskContext}
    +
    +/**
    + * Processes messages in groups as defined by groupBy function.
    + */
    +class GroupByTask[IN, GROUP, OUT](
    +    groupBy: IN => GROUP,
    +    taskContext: TaskContext,
    +    userConfig: UserConfig) extends Task(taskContext, userConfig) {
    +
    +  def this(context: TaskContext, conf: UserConfig) = {
    +    this(
    +      conf.getValue[IN => GROUP](GEARPUMP_STREAMING_GROUPBY_FUNCTION)(context.system).get,
    +      context, conf
    +    )
    +  }
    +
    +  private val groups: UnifiedMap[GROUP, WindowRunner[IN, OUT]] =
    +    new UnifiedMap[GROUP, WindowRunner[IN, OUT]]
    +
    +  override def onNext(message: Message): Unit = {
    +    val input = message.value.asInstanceOf[IN]
    +    val group = groupBy(input)
    +
    +    if (!groups.containsKey(group)) {
    +      groups.put(group,
    +        userConfig.getValue[WindowRunner[IN, OUT]](
    +          GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get)
    +    }
    +
    +    groups.get(group).process(input, message.timestamp)
    +  }
    +
    +  override def onWatermarkProgress(watermark: Instant): Unit = {
    +    groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] {
    --- End diff --
    
    Looks like you just create a consumer but will it be called?


> Don't enforce groupBy after window
> ----------------------------------
>
>                 Key: GEARPUMP-316
>                 URL: https://issues.apache.org/jira/browse/GEARPUMP-316
>             Project: Apache Gearpump
>          Issue Type: Sub-task
>          Components: streaming
>            Reporter: Manu Zhang
>            Assignee: Manu Zhang
>
> Return a normal Stream instead of WindowStream on window function. Window function defines
a boundary (window) for elements and the following operations should fall in corresponding
boundaries. The boundary should not change until a new window function is defined. The default
boundary is the {{GlobalWindows}} if not defined.
> This means there will be a window context for each underlying task. Elements are emitted
in terms of the trigger semantics. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message