From user-zh-return-2585-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Mar 25 05:52:09 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 6A02F18063D for ; Wed, 25 Mar 2020 06:52:09 +0100 (CET) Received: (qmail 84283 invoked by uid 500); 25 Mar 2020 05:52:08 -0000 Mailing-List: contact user-zh-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user-zh@flink.apache.org Delivered-To: mailing list user-zh@flink.apache.org Received: (qmail 84271 invoked by uid 99); 25 Mar 2020 05:52:08 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Mar 2020 05:52:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 89B6BC1DDE for ; Wed, 25 Mar 2020 05:52:07 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.199 X-Spam-Level: X-Spam-Status: No, score=-0.199 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 4HF0K7wYoePP for ; Wed, 25 Mar 2020 05:52:05 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2607:f8b0:4864:20::535; helo=mail-pg1-x535.google.com; envelope-from=jinhai.me@gmail.com; receiver= Received: from mail-pg1-x535.google.com (mail-pg1-x535.google.com [IPv6:2607:f8b0:4864:20::535]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 95B787F6A7 for ; Wed, 25 Mar 2020 05:52:04 +0000 (UTC) Received: by mail-pg1-x535.google.com with SMTP id 7so640107pgr.2 for ; Tue, 24 Mar 2020 22:52:04 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:subject:from:in-reply-to:date:cc :content-transfer-encoding:message-id:references:to; bh=S0levIIgCVL3eJy9sh5n4DENTQ8DJL/4khwgIdUmzME=; b=FKoMJrWNCSDZecBq8WqtvrKVJKrKRBFix95+i3XVsRgbwYD+Z+xSh6SdZzRPF4eqPF Gtx7boP/yByWpx5SVn0RcZZfP8POoAYrbgwF0NEz8axd/BZBhtz3Cc7vPmfzBvJqdGfp rXLXRY3JsEt//CoFTRu5VGODmHZJh/xAXPmzdDOFKEL/fSjP9qGmKUF0+Q+VTyp8pc+p 8ig0UhBOeiP+A83RBGOSwxUXTcK3nxPat1eDc+xz6clrWlDWAwp3l94hzMUbB/qn5zZv P9vGXdGu+XLnVBL4RJp+jhZ+XEmun5tPhQVZvi31yUlpBDPKx3br0m5QbwF9Ns2NfmWs us3Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:subject:from:in-reply-to:date:cc :content-transfer-encoding:message-id:references:to; bh=S0levIIgCVL3eJy9sh5n4DENTQ8DJL/4khwgIdUmzME=; b=tAYRy0soG+wh/AIeHLvyiwfVbO0VkieQ7PdrHWxwPCZe1Wm82R+0eyIuM727pv0Xrm kyysEBEqbe5ufBftdz5e4EndaTKdy3FHpyWoHnvsRJws48M98k5NmJNDWOnLXHBlz6qF ORDg3Vfxp+GwGHkS3etjS6WwgwptgRvU4Uu0Ao87qXG9DSUxf8oXxCWRnpJ3LdNgfogh SfjM/ZSLDTqfWavaulmD6oclSo+K+OSEiMEEnKm8Cgnj7wlDfa+oJYGKeWlHc/IRptNl eV8MPzUMUR3SBEHLAr+seGXrIyU0aZ7RmQx1vedGTWjLF89NG4s5rwQRvfUpiksnBey/ ijWw== X-Gm-Message-State: ANhLgQ3y+MSk1KJSGHEX9Wi7P97R+f9CB36CGF94PcXIxgcBlKqLlgSK SGJcczOcSxCli/Qwzv5T6G+sXG66HwI= X-Google-Smtp-Source: ADFU+vtQozf1tqR+bEfwok3+InfuKLYjpqnEpmJ7MxHZyFl7IrZE+TbIxgQf3jG6Ee3ghZBjgklj2w== X-Received: by 2002:a63:8048:: with SMTP id j69mr1547017pgd.410.1585115522871; Tue, 24 Mar 2020 22:52:02 -0700 (PDT) Received: from [127.0.0.1] ([47.75.173.14]) by smtp.gmail.com with ESMTPSA id c19sm127853pfo.205.2020.03.24.22.52.01 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Tue, 24 Mar 2020 22:52:02 -0700 (PDT) Content-Type: text/plain; charset=utf-8 Mime-Version: 1.0 (Mac OS X Mail 13.0 \(3608.60.0.2.5\)) Subject: =?utf-8?B?UmU6IGRkbCBlcyDmiqXplJk=?= From: jinhai wang In-Reply-To: Date: Wed, 25 Mar 2020 13:51:58 +0800 Cc: zhisheng2018@gmail.com Content-Transfer-Encoding: quoted-printable Message-Id: References: <959EB0A9-6E37-474A-85CF-3D8E01CB4297@gmail.com> To: user-zh@flink.apache.org X-Mailer: Apple Mail (2.3608.60.0.2.5) =E4=BC=98=E7=A7=80=EF=BC=81=E5=8F=AF=E4=BB=A5=E6=8F=90=E4=B8=AAimprove = issue Best Regards jinhai.me@gmail.com > 2020=E5=B9=B43=E6=9C=8825=E6=97=A5 =E4=B8=8B=E5=8D=881:40=EF=BC=8Czhishe= ng =E5=86=99=E9=81=93=EF=BC=9A >=20 > hi=EF=BC=8CLeonar Xu >=20 > =E5=AE=98=E6=96=B9 ES DDL =E7=8E=B0=E5=9C=A8=E4=B8=8D=E6=94=AF=E6=8C=81=E5= =A1=AB=E5=86=99 ES = =E9=9B=86=E7=BE=A4=E7=9A=84=E7=94=A8=E6=88=B7=E5=90=8D=E5=92=8C=E5=AF=86=E7= =A0=81=EF=BC=8C=E6=88=91=E7=8E=B0=E5=9C=A8=E5=9C=A8=E5=85=AC=E5=8F=B8=E5=B7= =B2=E7=BB=8F=E5=81=9A=E4=BA=86=E6=89=A9=E5=B1=95=EF=BC=8C=E5=8A=A0=E4=BA=86= =E8=BF=99=E4=B8=AA=E5=8A=9F=E8=83=BD=EF=BC=8C=E8=AF=B7=E9=97=AE=E7=A4=BE=E5= =8C=BA=E6=98=AF=E5=90=A6=E9=9C=80=E8=A6=81=E8=BF=99=E4=B8=AA=E5=8A=9F=E8=83= =BD=EF=BC=9F=E6=88=91=E8=AF=A5=E6=80=8E=E4=B9=88=E8=B4=A1=E7=8C=AE=E5=91=A2= =EF=BC=9F >=20 > = =E6=95=88=E6=9E=9C=E5=A6=82=E5=9B=BE=EF=BC=9Ahttp://zhisheng-blog.oss-cn-h= angzhou.aliyuncs.com/2020-03-25-053948.png >=20 > Best Wishes=EF=BC=81 >=20 > zhisheng >=20 > Leonard Xu =E4=BA=8E2020=E5=B9=B43=E6=9C=8824=E6=97=A5= =E5=91=A8=E4=BA=8C =E4=B8=8B=E5=8D=885:53=E5=86=99=E9=81=93=EF=BC=9A >=20 >> Hi, =E5=87=BA=E5=8F=91 >> =E7=9C=8B=E8=B5=B7=E6=9D=A5=E6=98=AF=E4=BD=A0=E7=BC=BA=E5=B0=91=E4=BA=86= =E4=BE=9D=E8=B5=96es = connector=E7=9A=84=E4=BE=9D=E8=B5=96[1]=EF=BC=8C=E6=89=80=E4=BB=A5=E5=8F=AA= =E8=83=BD=E6=89=BE=E5=88=B0=E5=86=85=E7=BD=AE=E7=9A=84filesystem = connector=EF=BC=8C=E7=9B=AE=E5=89=8D=E5=86=85=E7=BD=AE=E7=9A=84filesystem >> connector=E5=8F=AA=E6=94=AF=E6=8C=81csv = format=EF=BC=8C=E6=89=80=E4=BB=A5=E4=BC=9A=E6=9C=89=E8=BF=99=E4=B8=AA=E9=94= =99=E8=AF=AF=E3=80=82 >> =E5=9C=A8=E9=A1=B9=E7=9B=AE=E4=B8=AD=E5=8A=A0=E4=B8=8A=E7=BC=BA=E5=A4=B1= =E7=9A=84=E4=BE=9D=E8=B5=96=E5=8D=B3=E5=8F=AF=EF=BC=8C=E5=A6=82=E6=9E=9C=E4= =BD=BF=E7=94=A8SQL CLI=EF=BC=8C=E4=B9=9F=E9=9C=80=E8=A6=81=E5=B0=86=E4=BE=9D= =E8=B5=96=E7=9A=84jar=E5=8C=85=E6=94=BE=E5=88=B0 flink=E7=9A=84lib=E7=9B=AE= =E5=BD=95=E4=B8=8B=E3=80=82 >>=20 >> >> org.apache.flink >> flink-sql-connector-elasticsearch6_2.11 >> ${flink.version} >> >> >> org.apache.flink >> flink-json >> ${flink.version} >> >>=20 >> Best, >> Leonard >> [1] >> = https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.h= tml#elasticsearch-connector >> < >> = https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.h= tml#elasticsearch-connector >>>=20 >>=20 >>=20 >>> =E5=9C=A8 2020=E5=B9=B43=E6=9C=8823=E6=97=A5=EF=BC=8C23:30=EF=BC=8C=E5= =87=BA=E5=8F=91 <573693104@qq.com> =E5=86=99=E9=81=93=EF=BC=9A >>>=20 >>>=20 >>> =E6=BA=90=E7=A0=81=E5=A6=82=E4=B8=8B: >>> CREATE TABLE buy_cnt_per_hour ( >>> hour_of_day BIGINT, >>> buy_cnt BIGINT >>> ) WITH ( >>> 'connector.type' =3D 'elasticsearch', >>> 'connector.version' =3D '6', >>> 'connector.hosts' =3D 'http://localhost:9200', >>> 'connector.index' =3D 'buy_cnt_per_hour', >>> 'connector.document-type' =3D 'user_behavior', >>> 'connector.bulk-flush.max-actions' =3D '1', >>> 'format.type' =3D 'json', >>> 'update-mode' =3D 'append' >>> ) >>> import >> = org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import org.apache.flink.table.api.EnvironmentSettings; >>> import org.apache.flink.table.api.Table; >>> import org.apache.flink.table.api.java.StreamTableEnvironment; >>> import org.apache.flink.types.Row; >>>=20 >>> public class ESTest { >>>=20 >>> public static void main(String[] args) throws Exception { >>>=20 >>> //2=E3=80=81=E8=AE=BE=E7=BD=AE=E8=BF=90=E8=A1=8C=E7=8E=AF=E5=A2= =83 >>> StreamExecutionEnvironment streamEnv =3D >> StreamExecutionEnvironment.getExecutionEnvironment(); >>> EnvironmentSettings settings =3D >> = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().buil= d(); >>> StreamTableEnvironment tableEnv =3D >> StreamTableEnvironment.create(streamEnv, settings); >>> streamEnv.setParallelism(1); >>> String sinkDDL =3D " CREATE TABLE test_es ( hour_of_day = BIGINT, >> buy_cnt BIGINT " >>> + ") WITH ( 'connector.type' =3D 'elasticsearch', >> 'connector.version' =3D '6'," >>> + " 'connector.hosts' =3D 'http://localhost:9200', >> 'connector.index' =3D 'buy_cnt_per_hour'," >>> + " 'connector.document-type' =3D = 'user_behavior'," >>> + " 'connector.bulk-flush.max-actions' =3D '1',\n" = + " >> 'format.type' =3D 'json'," >>> + " 'update-mode' =3D 'append' )"; >>> tableEnv.sqlUpdate(sinkDDL); >>> Table table =3D tableEnv.sqlQuery("select * from test_es "); >>> tableEnv.toRetractStream(table, Row.class).print(); >>> streamEnv.execute(""); >>> } >>>=20 >>> } >>> =E5=85=B7=E4=BD=93error >>> The matching candidates: >>> org.apache.flink.table.sources.CsvAppendTableSourceFactory >>> Mismatched properties: >>> 'connector.type' expects 'filesystem', but is 'elasticsearch' >>> 'format.type' expects 'csv', but is 'json' >>>=20 >>> The following properties are requested: >>> connector.bulk-flush.max-actions=3D1 >>> connector.document-type=3Duser_behavior >>> connector.hosts=3Dhttp://localhost:9200 >>> connector.index=3Dbuy_cnt_per_hour >>> connector.type=3Delasticsearch >>> connector.version=3D6 >>> format.type=3Djson >>> schema.0.data-type=3DBIGINT >>> schema.0.name=3Dhour_of_day >>> schema.1.data-type=3DBIGINT >>> schema.1.name=3Dbuy_cnt >>> update-mode=3Dappend >>=20 >>=20