flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 李佟 <lit...@iie.ac.cn>
Subject Flink1.9.1的SQL向前不兼容的问题
Date Thu, 12 Dec 2019 01:55:16 GMT
近期进行Flink升级,将原来的程序从老的集群(1.8.0运行正常)迁移到新的集群(1.9.1)中。在部署程序的时候发现在1.9.1的集群中,原来运行正常的Flink
SQL的程序无法执行,异常如下:




org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute
column.
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)


at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)




跟踪到反编译的scala文件并设置断点,发现下图中的红框部分没有执行,直接跳过。




功能很简单,就是在某个时间窗对数值求和。测试用例如下:




package org.flowmatrix.isp.traffic.accounting.test;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
import org.apache.flink.types.Row;
import org.junit.Test;

import javax.annotation.Nullable;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class TestSql {
    @Test
    public void testAccountingSql() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        try {
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

            SimpleTableSource source = new SimpleTableSource();
            Table t = tableEnv.fromTableSource(source);

            String interval = "5"; //5 second
            System.out.println("source schema is " + source.getTableSchema());

            Table sqlResult = tableEnv.sqlQuery("SELECT " +
                    " TUMBLE_START(UserActionTime, INTERVAL '" + interval + "' SECOND) as
rowTime, " +
                    " Username," +
                    " SUM(Data) as Data " +
                    " FROM  " + t +
                    " GROUP BY TUMBLE(UserActionTime, INTERVAL '" + interval + "' SECOND),Username");


            String[] fieldNames = {
                    "rowTime",
                    "Username", "Data"};
            TypeInformation[] fieldTypes = {
                    TypeInformation.of(Timestamp.class),
                    TypeInformation.of(String.class),
                    TypeInformation.of(Long.class)};

            TableSink sink1 = new CsvTableSink("/tmp/data.log", ",");
            sink1 = sink1.configure(fieldNames, fieldTypes);
            tableEnv.registerTableSink("EsSinkTable", sink1);
            System.out.println("sql result schema is " + sqlResult.getSchema());

            tableEnv.sqlUpdate("insert into EsSinkTable select  " +
                    "rowTime,Username,Data from " + sqlResult + "");

            env.execute("test");
        } catch (Exception e) {
            e.printStackTrace();
            System.err.println("start program error. FlowMatrix --zookeeper <zookeeperAdress>
--config <configpath>" +
                    " --name <jobName> --interval <intervalInMinute> --indexName
<indexName>");
            System.err.println(e.toString());
            return;
        }
    }

    public static class SimpleTableSource implements StreamTableSource<Row>, DefinedRowtimeAttributes
{
        @Override
        public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
            return env.fromCollection(genertateData()).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Row>()
{
                private long lastWaterMarkMillSecond = -1;
                private long waterMarkPeriodMillSecond = 1000;
                @Nullable
                @Override
                public Watermark checkAndGetNextWatermark(Row lastElement, long extractedTimestamp)
{
                    if(extractedTimestamp - lastWaterMarkMillSecond >= waterMarkPeriodMillSecond){
                        lastWaterMarkMillSecond = extractedTimestamp;
                        return new Watermark(extractedTimestamp);
                    }
                    return null;
                }

                @Override
                public long extractTimestamp(Row element, long previousElementTimestamp) {
                    return ((Long)element.getField(0))*1000;
                }
            });
        }

        @Override
        public TableSchema getTableSchema() {
            TableSchema schema = TableSchema.builder()
                    .field("Username", Types.STRING())
                    .field("Data", Types.LONG())
                    .field("UserActionTime", Types.SQL_TIMESTAMP())
                    .build();
            return schema;
        }

        @Override
        public TypeInformation<Row> getReturnType() {
            String[] names = new String[]{"Username", "Data", "UserActionTime"};
            TypeInformation[] types =
                    new TypeInformation[]{Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()};
            return Types.ROW(names, types);
        }


        @Override
        public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
            RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
                    "UserActionTime",
                    new ExistingField("UserActionTime"),
                    new AscendingTimestamps());
            List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
            return listRowtimeAttrDescr;
        }


        private static List<Row> genertateData() {
            List<Row> rows = new ArrayList<>();
            long startTime = System.currentTimeMillis() / 1000 - 10000;
            for (int i = 0; i < 10000; i++) {
                rows.add(buildRecord(startTime, i));
            }
            return rows;
        }

        private static Row buildRecord(long startTime, int i) {
            Row row = new Row(3);
            row.setField(0, "fox"); //Username
            row.setField(1, Math.random()); //Data
            row.setField(2, startTime + i); //UserActionTime
            return row;
        }
    }
}





Mime
  • Unnamed multipart/related (inline, None, 0 bytes)
View raw message