flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "方如" <1624209...@qq.com>
Subject 窗口中的数据无法发送到下游
Date Thu, 27 Feb 2020 06:34:55 GMT
代码如下:
&nbsp; &nbsp; &nbsp; &nbsp; //将json转化为LogBean
&nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<LogBean&gt; data = filter.map(new
Json2LogBean());

&nbsp; &nbsp; &nbsp; KeyedStream<Tuple3<String, String, Integer&gt;,
String&gt; tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean&gt;()
{
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long extractAscendingTimestamp(LogBean
element) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LocalDateTime
parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long
eventTime = parse.toEpochSecond(ZoneOffset.of("+8"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(eventTime);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return
eventTime;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).map(new MapFunction<LogBean, Tuple3<String,
String, Integer&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public Tuple3<String,
String, Integer&gt; map(LogBean value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //获取用户id做分组
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return
new Tuple3<&gt;(value.getNickname(), value.toString(), 1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).keyBy(new KeySelector<Tuple3<String,
String, Integer&gt;, String&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public String getKey(Tuple3<String,
String, Integer&gt; value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return
value.f0;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; });


&nbsp; &nbsp; &nbsp; &nbsp; WindowedStream<Tuple3<String, String, Integer&gt;,
String, TimeWindow&gt; window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));


&nbsp; &nbsp; &nbsp; &nbsp; window.sum(2).print();

在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的


拜谢!代码如下:
&nbsp; &nbsp; &nbsp; &nbsp; //将json转化为LogBean
&nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<LogBean&gt; data = filter.map(new
Json2LogBean());

&nbsp; &nbsp; &nbsp; KeyedStream<Tuple3<String, String, Integer&gt;,
String&gt; tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean&gt;()
{
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long extractAscendingTimestamp(LogBean
element) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LocalDateTime
parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long
eventTime = parse.toEpochSecond(ZoneOffset.of("+8"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println(eventTime);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return
eventTime;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).map(new MapFunction<LogBean, Tuple3<String,
String, Integer&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public Tuple3<String,
String, Integer&gt; map(LogBean value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //获取用户id做分组
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return
new Tuple3<&gt;(value.getNickname(), value.toString(), 1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).keyBy(new KeySelector<Tuple3<String,
String, Integer&gt;, String&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public String getKey(Tuple3<String,
String, Integer&gt; value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return
value.f0;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; });


&nbsp; &nbsp; &nbsp; &nbsp; WindowedStream<Tuple3<String, String, Integer&gt;,
String, TimeWindow&gt; window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));


&nbsp; &nbsp; &nbsp; &nbsp; window.sum(2).print();

在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的


拜谢!
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message