flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yuan Yifan" <tsingjyuj...@163.com>
Subject Re:Re: flink wordcount中 sum是在什么时候,哪个地方调用的?
Date Thu, 28 Feb 2019 06:46:17 GMT
如果你只想知道在哪里做SUM的话,看这个类:
org.apache.flink.streaming.api.functions.aggregation.SumAggregator
的reduce函数就行了。


但是如果你要想知道啥时候把SUM的结果发射出来的话,这个就太复杂了,因为你后面有可能接个Trigger,在满足一定条件的时候也可能“手动”调用FireAndPurge,所以未必是在窗口时间结束的时候发射。


详细可以看下这里的代码:
org.apache.flink.streaming.api.datastream.WindowedStream#reduce(org.apache.flink.api.common.functions.ReduceFunction<T>,
org.apache.flink.streaming.api.functions.windowing.WindowFunction<T,R,K,W>, org.apache.flink.api.common.typeinfo.TypeInformation<R>)






在 2019-02-28 14:34:40," " <thinktothings@yahoo.com.INVALID> 写道:
> Flink wordCount 本地程序,代码如下,想调下代码,没找到Window,的时间Trigger结束时,在哪个地方进行的sum,统计结果,我想关注这个点的问题,请问能详细的说明下吗?-------------------------------------package
com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc
>
>import org.apache.flink.configuration.Configuration
>import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>import org.apache.flink.streaming.api.windowing.time.Time
>
>/**
>  * nc -lk 1234  输入数据
>  */
>object SocketWindowWordCountLocal {
>
>  def main(args: Array[String]): Unit = {
>
>
>    val port = 1234
>    // get the execution environment
>   // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
>
>
>    val configuration : Configuration = new Configuration()
>    val timeout = "100000 s"
>    val timeoutHeartbeatPause = "1000000 s"
>    configuration.setString("akka.ask.timeout",timeout)
>    configuration.setString("akka.lookup.timeout",timeout)
>    configuration.setString("akka.tcp.timeout",timeout)
>    configuration.setString("akka.transport.heartbeat.interval",timeout)
>    configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
>    configuration.setString("akka.watch.heartbeat.pause",timeout)
>    configuration.setInteger("heartbeat.interval",10000000)
>    configuration.setInteger("heartbeat.timeout",50000000)
>    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
>
>
>
>
>
>    // get input data by connecting to the socket
>    val dataStream = env.socketTextStream("localhost", port, '\n')
>
>
>
>    import org.apache.flink.streaming.api.scala._
>    val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => WordWithCount(w,1))
>      .keyBy("word")
>      /**
>        * 每20秒刷新一次,相当于重新开始计数,
>        * 好处,不需要一直拿所有的数据统计
>        * 只需要在指定时间间隔内的增量数据,减少了数据规模
>        */
>      .timeWindow(Time.seconds(20))
>      //.countWindow(3)
>      //.countWindow(3,1)
>      //.countWindowAll(3)
>
>
>      .sum("count" )
>
>    textResult.print().setParallelism(1)
>
>
>
>    if(args == null || args.size ==0){
>      env.execute("默认作业")
>
>      //执行计划
>      //println(env.getExecutionPlan)
>      //StreamGraph
>     //println(env.getStreamGraph.getStreamingPlanAsJSON)
>
>
>
>      //JsonPlanGenerator.generatePlan(jobGraph)
>
>    }else{
>      env.execute(args(0))
>    }
>
>    println("结束")
>
>  }
>
>
>  // Data type for words with count
>  case class WordWithCount(word: String, count: Long)
>
>}
>
>
>    On Thursday, 28 February 2019, 2:08:00 pm GMT+8, Yaoting Gong <fall.for.you.ffm@gmail.com>
wrote:  
> 
> @Yuan Yifan
>
>*不能贴图的。*
>
>On Thu, Feb 28, 2019 at 2:03 PM Yuan Yifan <tsingjyujing@163.com> wrote:
>
>>
>> 你说的应该是这里的代码:
>>
>>
>> http://flink-cn.shinonomelab.com/quickstart/setup_quickstart.html#read-the-code
>>
>> 其实SUM应该会在每一条数据来的时候调用的,但是输出结果只有在最后FireAndPurge的时候。
>>
>> 本质上,sum是执行了一个Sum类型的Aggregate:
>> 其AggregateFunction是:
>>
>> org.apache.flink.streaming.api.functions.aggregation.SumAggregator#SumAggregator(int,
>> org.apache.flink.api.common.typeinfo.TypeInformation<T>,
>> org.apache.flink.api.common.ExecutionConfig)
>>
>>
>> 其中实现了reduce方法:
>>
>> 所以你可以不必关心究竟是在何时计算的,有可能在多个地方计算以后再合并,但是如论如何,Reduce计算的性质保证,结果一定是对的。
>>
>>
>>
>> 在 2019-02-28 13:04:59," " <thinktothings@yahoo.com.INVALID> 写道:
>> >请问:  flink wordcount中      sum是在什么时候,哪个地方调用的?
>>
>>
>>
>>
>>
>  
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message