flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "mayangyang02"<mayangyan...@imdada.cn>
Subject Re: flink wordcount中 sum是在什么时候,哪个地方调用的?
Date Thu, 28 Feb 2019 08:41:27 GMT
@thinktothings
这个sum其实是维护在WindowOperator的state里的。
你看下WindowedStream#sum()会发现最终会调用到WindowedStream#reduce()。
而对reduce来说,这个state的实现类是HeapReducingState,对这个state来说,调用其add方法是就会实现聚合(对sum来说就是相加)。在window
emit时,就会将这个维护的state发送出去。




原始邮件
发件人:Yaoting Gongfall.for.you.ffm@gmail.com
收件人:user-zhuser-zh@flink.apache.org
发送时间:2019年2月28日(周四) 15:29
主题:Re: flink wordcount中 sum是在什么时候,哪个地方调用的?


@thinktothings 不知道是否我理解正确,我觉得你可以对flink的一些计算流程不是很清楚。
SumAggregator内的reduce 方法就可以计算出需要的“sum”结果。 你一直问什么时候调用sum,是指代码中的“sum("count")”吗?这个在构建steamgraph的时候已经调用了,目的就是获取内部返回的SumAggregator对象。
有个文章可以了解下。看下第一次即可:https://www.jianshu.com/p/13070729289c
如果我理解的不对,请忽略 On Thu, Feb 28, 2019 at 2:57 PM thinktothings@yahoo.com.invalid
wrote:  --------------------------------------   ).本地环境: scala WordCount ,程序在附件中
SocketWindowWordCountLocal.scala  ).输入数据:  a b a  ).设置的 timeWindow(Time.seconds(20))
 ).[问题]想调试Flink源码中具体在哪一步进行sum操作  -------------------------------------------------
  调试:  ).RecordWriter.emit(),这个时候,数据是已经flatMap,map之后的值,
 函数中会一条一条数据发送(a,1),(b,1),(a,1)  ).调用 StreamSink.processElement
函数打印输出结果  ).没明白地方,是在调用StreamSink.processElement之前,在哪个地方调用了sum,对相同key进行了聚合操作
   On Thursday, 28 February 2019, 2:47:39 pm GMT+8, Yaoting Gong   fall.for.you.ffm@gmail.com
wrote:    你好。  我不知道你的是什么项目的代码。我从flink 官方的样例代码
SocketWindowWordCount.scala找到。  从sum跟进去,最终能找到一个SumAggregator。
  On Thu, Feb 28, 2019 at 2:34 PM thinktothings@yahoo.com.invalid wrote:    Flink wordCount
   本地程序,代码如下,想调下代码,没找到Window,的时间Trigger结束时,在哪个地方进行的sum,统计结果,我想关注这个点的问题,请问能详细的说明下吗?-------------------------------------package
  com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc     import org.apache.flink.configuration.Configuration
  import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment   import org.apache.flink.streaming.api.windowing.time.Time
    /**   * nc -lk 1234 输入数据   */   object SocketWindowWordCountLocal {     def main(args:
Array[String]): Unit = {       val port = 1234   // get the execution environment   // val
env: StreamExecutionEnvironment =   StreamExecutionEnvironment.getExecutionEnvironment   
   val configuration : Configuration = new Configuration()   val timeout = "100000 s"   val
timeoutHeartbeatPause = "1000000 s"   configuration.setString("akka.ask.timeout",timeout)
  configuration.setString("akka.lookup.timeout",timeout)   configuration.setString("akka.tcp.timeout",timeout)
  configuration.setString("akka.transport.heartbeat.interval",timeout)      configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
  configuration.setString("akka.watch.heartbeat.pause",timeout)   configuration.setInteger("heartbeat.interval",10000000)
  configuration.setInteger("heartbeat.timeout",50000000)   val env:StreamExecutionEnvironment
=   StreamExecutionEnvironment.createLocalEnvironment(1,configuration)             // get
input data by connecting to the socket   val dataStream = env.socketTextStream("localhost",
port, '\n')         import org.apache.flink.streaming.api.scala._   val textResult = dataStream.flatMap(
w = w.split("\\s") ).map( w =   WordWithCount(w,1))   .keyBy("word")   /**   * 每20秒刷新一次,相当于重新开始计数,
  * 好处,不需要一直拿所有的数据统计   * 只需要在指定时间间隔内的增量数据,减少了数据规模
  */   .timeWindow(Time.seconds(20))   //.countWindow(3)   //.countWindow(3,1)   //.countWindowAll(3)
      .sum("count" )     textResult.print().setParallelism(1)         if(args == null || args.size
==0){   env.execute("默认作业")     //执行计划   //println(env.getExecutionPlan) 
 //StreamGraph   //println(env.getStreamGraph.getStreamingPlanAsJSON)         //JsonPlanGenerator.generatePlan(jobGraph)
    }else{   env.execute(args(0))   }     println("结束")     }       // Data type for words
with count   case class WordWithCount(word: String, count: Long)     }       On Thursday,
28 February 2019, 2:08:00 pm GMT+8, Yaoting Gong    fall.for.you.ffm@gmail.com wrote:    
@Yuan Yifan     *不能贴图的。*     On Thu, Feb 28, 2019 at 2:03 PM Yuan Yifan tsingjyujing@163.com
wrote:         你说的应该是这里的代码:             http://flink-cn.shinonomelab.com/quickstart/setup_quickstart.html#read-the-code
      其实SUM应该会在每一条数据来的时候调用的,但是输出结果只有在最后FireAndPurge的时候。
      本质上,sum是执行了一个Sum类型的Aggregate:    其AggregateFunction是:
         org.apache.flink.streaming.api.functions.aggregation.SumAggregator#SumAggregator(int,
   org.apache.flink.api.common.typeinfo.TypeInformationT,    org.apache.flink.api.common.ExecutionConfig)
         其中实现了reduce方法:       所以你可以不必关心究竟是在何时计算的,有可能在多个地方计算以后再合并,但是如论如何,Reduce计算的性质保证,结果一定是对的。
            在 2019-02-28 13:04:59," " thinktothings@yahoo.com.INVALID 写道:    请问:
flink wordcount中 sum是在什么时候,哪个地方调用的?
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message