flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Leonard Xu <xbjt...@gmail.com>
Subject Re: ddl es 报错
Date Tue, 24 Mar 2020 09:53:25 GMT
Hi, 出发
看起来是你缺少了依赖es connector的依赖[1],所以只能找到内置的filesystem
connector,目前内置的filesystem connector只支持csv format,所以会有这个错误。
在项目中加上缺失的依赖即可,如果使用SQL CLI,也需要将依赖的jar包放到
flink的lib目录下。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-elasticsearch6_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>

Best,
Leonard
[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector>


> 在 2020年3月23日,23:30,出发 <573693104@qq.com> 写道:
> 
> 
> 源码如下:
> CREATE TABLE buy_cnt_per_hour ( 
>     hour_of_day BIGINT,
>     buy_cnt BIGINT
> ) WITH (
>     'connector.type' = 'elasticsearch',
>     'connector.version' = '6',
>     'connector.hosts' = 'http://localhost:9200',
>     'connector.index' = 'buy_cnt_per_hour',
>     'connector.document-type' = 'user_behavior',
>     'connector.bulk-flush.max-actions' = '1',
>     'format.type' = 'json',
>     'update-mode' = 'append'
> )
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
> 
> public class ESTest {
> 
>     public static void main(String[] args) throws Exception {
> 
>         //2、设置运行环境
>         StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
>         streamEnv.setParallelism(1);
>         String sinkDDL = " CREATE TABLE test_es ( hour_of_day BIGINT,  buy_cnt BIGINT
"
>                 + ") WITH ( 'connector.type' = 'elasticsearch',  'connector.version'
= '6',"
>                 + "    'connector.hosts' = 'http://localhost:9200',  'connector.index'
= 'buy_cnt_per_hour',"
>                 + "    'connector.document-type' = 'user_behavior',"
>                 + "    'connector.bulk-flush.max-actions' = '1',\n" + "    'format.type'
= 'json',"
>                 + "    'update-mode' = 'append' )";
>         tableEnv.sqlUpdate(sinkDDL);
>         Table table = tableEnv.sqlQuery("select * from test_es ");
>         tableEnv.toRetractStream(table, Row.class).print();
>         streamEnv.execute("");
>     }
> 
> }
> 具体error
> The matching candidates:
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> Mismatched properties:
> 'connector.type' expects 'filesystem', but is 'elasticsearch'
> 'format.type' expects 'csv', but is 'json'
> 
> The following properties are requested:
> connector.bulk-flush.max-actions=1
> connector.document-type=user_behavior
> connector.hosts=http://localhost:9200
> connector.index=buy_cnt_per_hour
> connector.type=elasticsearch
> connector.version=6
> format.type=json
> schema.0.data-type=BIGINT
> schema.0.name=hour_of_day
> schema.1.data-type=BIGINT
> schema.1.name=buy_cnt
> update-mode=append


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message