flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From   <thinktothi...@yahoo.com.INVALID>
Subject Re: flink wordcount中 sum是在什么时候,哪个地方调用的?
Date Thu, 28 Feb 2019 09:31:42 GMT
  我明白了,感谢大家).RecordWriter.emit(),这个时候,数据是已经flatMap,map之后的值,
函数中会一条一条数据发送(a,1),(a,1),(a,1)).WindowOperator.processElement()函数中,收到数据后,调用
windowState.add(element.value), 其实调的是 HeapReducingState.add()函数,这个state值在WindowOperator.windowState.stateTable.primaryTable.state
这个里边存着(key,value)  HeapReducingState.add()中调用transform,最终调用ReduceTransformation.apply,该函数会调用reduce函数,在同一次window中,每来一个相同key,就更新一次,实现累加 
     public V apply(V previousState, V value) throws Exception { return previousState !=
null ? reduceFunction.reduce(previousState, value) : value; }

    On Thursday, 28 February 2019, 4:42:00 pm GMT+8, mayangyang02 <mayangyang02@imdada.cn>
wrote:  
 
 @thinktothings
这个sum其实是维护在WindowOperator的state里的。
你看下WindowedStream#sum()会发现最终会调用到WindowedStream#reduce()。
而对reduce来说,这个state的实现类是HeapReducingState,对这个state来说,调用其add方法是就会实现聚合(对sum来说就是相加)。在window
emit时,就会将这个维护的state发送出去。




原始邮件
发件人:Yaoting Gongfall.for.you.ffm@gmail.com
收件人:user-zhuser-zh@flink.apache.org
发送时间:2019年2月28日(周四) 15:29
主题:Re: flink wordcount中 sum是在什么时候,哪个地方调用的?


@thinktothings 不知道是否我理解正确,我觉得你可以对flink的一些计算流程不是很清楚。
SumAggregator内的reduce 方法就可以计算出需要的“sum”结果。 你一直问什么时候调用sum,是指代码中的“sum("count")”吗?这个在构建steamgraph的时候已经调用了,目的就是获取内部返回的SumAggregator对象。
有个文章可以了解下。看下第一次即可:https://www.jianshu.com/p/13070729289c
如果我理解的不对,请忽略 On Thu, Feb 28, 2019 at 2:57 PM thinktothings@yahoo.com.invalid
wrote:  --------------------------------------  ).本地环境: scala WordCount ,程序在附件中
SocketWindowWordCountLocal.scala  ).输入数据:  a b a  ).设置的 timeWindow(Time.seconds(20)) 
).[问题]想调试Flink源码中具体在哪一步进行sum操作  ------------------------------------------------- 
调试:  ).RecordWriter.emit(),这个时候,数据是已经flatMap,map之后的值, 
函数中会一条一条数据发送(a,1),(b,1),(a,1)  ).调用 StreamSink.processElement
函数打印输出结果  ).没明白地方,是在调用StreamSink.processElement之前,在哪个地方调用了sum,对相同key进行了聚合操作 
  On Thursday, 28 February 2019, 2:47:39 pm GMT+8, Yaoting Gong  fall.for.you.ffm@gmail.com
wrote:    你好。  我不知道你的是什么项目的代码。我从flink 官方的样例代码
SocketWindowWordCount.scala找到。  从sum跟进去,最终能找到一个SumAggregator。 
On Thu, Feb 28, 2019 at 2:34 PM thinktothings@yahoo.com.invalid wrote:    Flink wordCount 
  本地程序,代码如下,想调下代码,没找到Window,的时间Trigger结束时,在哪个地方进行的sum,统计结果,我想关注这个点的问题,请问能详细的说明下吗?-------------------------------------package 
com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc    import org.apache.flink.configuration.Configuration 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment  import org.apache.flink.streaming.api.windowing.time.Time 
  /**  * nc -lk 1234 输入数据  */  object SocketWindowWordCountLocal {    def main(args:
Array[String]): Unit = {      val port = 1234  // get the execution environment  // val
env: StreamExecutionEnvironment =  StreamExecutionEnvironment.getExecutionEnvironment   
  val configuration : Configuration = new Configuration()  val timeout = "100000 s"  val
timeoutHeartbeatPause = "1000000 s"  configuration.setString("akka.ask.timeout",timeout) 
configuration.setString("akka.lookup.timeout",timeout)  configuration.setString("akka.tcp.timeout",timeout) 
configuration.setString("akka.transport.heartbeat.interval",timeout)      configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause) 
configuration.setString("akka.watch.heartbeat.pause",timeout)  configuration.setInteger("heartbeat.interval",10000000) 
configuration.setInteger("heartbeat.timeout",50000000)  val env:StreamExecutionEnvironment
=  StreamExecutionEnvironment.createLocalEnvironment(1,configuration)            //
get input data by connecting to the socket  val dataStream = env.socketTextStream("localhost",
port, '\n')        import org.apache.flink.streaming.api.scala._  val textResult = dataStream.flatMap(
w = w.split("\\s") ).map( w =  WordWithCount(w,1))  .keyBy("word")  /**  * 每20秒刷新一次,相当于重新开始计数, 
* 好处,不需要一直拿所有的数据统计  * 只需要在指定时间间隔内的增量数据,减少了数据规模 
*/  .timeWindow(Time.seconds(20))  //.countWindow(3)  //.countWindow(3,1)  //.countWindowAll(3) 
    .sum("count" )    textResult.print().setParallelism(1)        if(args == null
|| args.size ==0){  env.execute("默认作业")    //执行计划  //println(env.getExecutionPlan) 
//StreamGraph  //println(env.getStreamGraph.getStreamingPlanAsJSON)        //JsonPlanGenerator.generatePlan(jobGraph) 
  }else{  env.execute(args(0))  }    println("结束")    }      // Data type for
words with count  case class WordWithCount(word: String, count: Long)    }      On Thursday,
28 February 2019, 2:08:00 pm GMT+8, Yaoting Gong    fall.for.you.ffm@gmail.com wrote: 
  @Yuan Yifan    *不能贴图的。*    On Thu, Feb 28, 2019 at 2:03 PM Yuan Yifan tsingjyujing@163.com
wrote:        你说的应该是这里的代码:            http://flink-cn.shinonomelab.com/quickstart/setup_quickstart.html#read-the-code
      其实SUM应该会在每一条数据来的时候调用的,但是输出结果只有在最后FireAndPurge的时候。 
    本质上,sum是执行了一个Sum类型的Aggregate:    其AggregateFunction是: 
        org.apache.flink.streaming.api.functions.aggregation.SumAggregator#SumAggregator(int, 
  org.apache.flink.api.common.typeinfo.TypeInformationT,    org.apache.flink.api.common.ExecutionConfig) 
        其中实现了reduce方法:      所以你可以不必关心究竟是在何时计算的,有可能在多个地方计算以后再合并,但是如论如何,Reduce计算的性质保证,结果一定是对的。 
          在 2019-02-28 13:04:59," " thinktothings@yahoo.com.INVALID 写道:   
请问: flink wordcount中 sum是在什么时候,哪个地方调用的?  
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message