From user-zh-return-3143-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Apr 23 08:23:49 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 894F3180608 for ; Thu, 23 Apr 2020 10:23:49 +0200 (CEST) Received: (qmail 63302 invoked by uid 500); 23 Apr 2020 08:23:48 -0000 Mailing-List: contact user-zh-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user-zh@flink.apache.org Delivered-To: mailing list user-zh@flink.apache.org Received: (qmail 63290 invoked by uid 99); 23 Apr 2020 08:23:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Apr 2020 08:23:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 94E58C0447 for ; Thu, 23 Apr 2020 08:23:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.925 X-Spam-Level: * X-Spam-Status: No, score=1.925 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, FROM_LOCAL_NOVOWEL=0.5, HK_RANDOM_ENVFROM=0.626, HK_RANDOM_FROM=0.999, SPF_HELO_NONE=0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id gtGVCIwE37_V for ; Thu, 23 Apr 2020 08:23:42 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2607:f8b0:4864:20::1031; helo=mail-pj1-x1031.google.com; envelope-from=xbjtdcq@gmail.com; receiver= Received: from mail-pj1-x1031.google.com (mail-pj1-x1031.google.com [IPv6:2607:f8b0:4864:20::1031]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id 262017DD12 for ; Thu, 23 Apr 2020 08:23:42 +0000 (UTC) Received: by mail-pj1-x1031.google.com with SMTP id a32so2161034pje.5 for ; Thu, 23 Apr 2020 01:23:41 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:subject:from:in-reply-to:date :content-transfer-encoding:message-id:references:to; bh=25fBK9MyPUIr5symCSiZka+0WAVgMZ85BBpPlZRTNEA=; b=GAPCWnSM/eTemJCxys/3YFaN+OIFHeEf1npWQrEcW2EYNiyhFund0BnR4Gj9s+AFrq LVk6+AX1QzhL0loWkHX7V4YvZxlLlZNEvdiysZP+Rd59dLlsMWTtUY99V/b8ErIDW3+M pZrDBjWEv53F/QNYzX5ryAvLtMTa1pvQlUEelugci7ZANDAC+m0vypAfBdNIwa0tjFGo 6FQKO+IEaA9+X2Gyyf8L9FB6jC+YVA9tV/tphz9C17a1mGLO/pdgTTvmfN32+LbGGy+w +jmm6V9wPWItOZSgHrHEZFNe8kGZRk3+XC9sV2uhuwtrfPDr+UOuD5zKfX9n8mBrEr8L od+w== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:subject:from:in-reply-to:date :content-transfer-encoding:message-id:references:to; bh=25fBK9MyPUIr5symCSiZka+0WAVgMZ85BBpPlZRTNEA=; b=fEkIMl3m3FMoCK71A5gM2tuoTh2F7ZvGsSRrgfOHApPbVwbZ4LgfRpFN5J8WhmT5XK M2gzeb1EOv4Z5xW8FzqTEOl+N3tYza/zc97mglWtPMzSsUdDlKw6nBUsPjJ9Fkh0cZuQ 7EXAbCzNi28PK/5d1BL+v14d/VDKfxg2/29AAofG/6hVJ8yvU6xRQI3eQwdL/y5N1pR0 SUV+LMaosnjcqraYj8ixqKpA+vGuRzo7vFVhgDk6z59kc4tjDi6CkG3MoJ2AVrt32kDw 5SAny68qma2cqBC9RQY46Bsdb/hhm0jEKs33gMZjN0Nsf9wwHZxOB0EDzI85JNFvUF90 KOwg== X-Gm-Message-State: AGi0PubrvAl/DJ4ZTVLtKQMPNzFPUyQA11JQQVFE0Rs13mAOujusDbpe 1W9AskKDPkDuikF1hFxvxgLtsWH1 X-Google-Smtp-Source: APiQypL6M+MfUFCxWWK+AoKBD21FCZ/aIZ5mIZ57FKEqSDYwVnAgtnpUNcpjq28ITFQKSmHT1AKoBg== X-Received: by 2002:a17:90a:12c3:: with SMTP id b3mr3300676pjg.57.1587630220211; Thu, 23 Apr 2020 01:23:40 -0700 (PDT) Received: from [127.0.0.1] ([47.251.3.230]) by smtp.gmail.com with ESMTPSA id y13sm1816043pfc.78.2020.04.23.01.23.38 for (version=TLS1_2 cipher=ECDHE-ECDSA-AES128-GCM-SHA256 bits=128/128); Thu, 23 Apr 2020 01:23:39 -0700 (PDT) Content-Type: text/plain; charset=utf-8 Mime-Version: 1.0 (Mac OS X Mail 12.2 \(3445.102.3\)) Subject: =?gb2312?B?UmU6ILnY09pSZXRyYWN0U3RyZWFtwffQtG15c3Fss/bP1mphdmEu?= =?gb2312?B?c3FsLlNRTEV4Y2VwdGlvbjogTm8gdmFsdWUgc3BlY2lmaWVkIGZvciBwYXJh?= =?gb2312?B?bWV0ZXIgMc7KzOI=?= From: Leonard Xu X-Priority: 3 In-Reply-To: <2208dd27.781c.171a4f036e7.Coremail.hdxg1101300123@163.com> Date: Thu, 23 Apr 2020 16:23:34 +0800 Content-Transfer-Encoding: quoted-printable Message-Id: References: <1c689819.5b55.171a22eb665.Coremail.hdxg1101300123@163.com> <92776246-57AB-4B91-9ABF-41ED5AF2EF5D@gmail.com> <2208dd27.781c.171a4f036e7.Coremail.hdxg1101300123@163.com> To: user-zh X-Mailer: Apple Mail (2.3445.102.3) Hi, = =E6=88=91=E6=9C=AC=E5=9C=B0=E5=A4=8D=E7=8E=B0=E4=BA=86=E4=B8=8B=EF=BC=8C=E7= =94=A81.10.0=E5=8F=91=E7=8E=B0=E7=9A=84=E4=BD=A0=E7=9A=84sql=E6=98=AFok=E7= =9A=84=EF=BC=8C=E7=BB=93=E6=9E=9C=E4=B9=9F=E7=AC=A6=E5=90=88=E9=A2=84=E6=9C= =9F=E2=98=BA=EF=B8=8F=EF=BC=8C=E5=A6=82=E4=B8=8B[1]=EF=BC=9A =E7=9C=8B=E5=88=B0=E4=BD=A0=E5=BB=BA=E4=BA=86JIRA=EF=BC=8C=E6=88=91=E4=BB=AC= =E5=9C=A8issue=E9=87=8C=E7=BB=A7=E7=BB=AD=E8=B7=9F=E8=BF=9B=E5=90=A7 =E7=A5=9D=E5=A5=BD=EF=BC=8C Leonard Xu [1] mysql> select * from order_state_cnt; +------------+--------------+------+ | order_date | product_code | cnt | +------------+--------------+------+ | 2020-04-01 | product1 | 3 | | 2020-04-01 | product2 | 5 | | 2020-04-01 | product1 | 5 | | 2020-04-01 | product2 | 9 | +------------+--------------+------+ 4 rows in set (0.00 sec) mysql> select * from order_state_cnt; +------------+--------------+------+ | order_date | product_code | cnt | +------------+--------------+------+ | 2020-04-01 | product1 | 3 | | 2020-04-01 | product2 | 5 | | 2020-04-01 | product1 | 5 | | 2020-04-01 | product2 | 9 | | 2020-04-01 | product1 | 2 | | 2020-04-01 | product2 | 4 | +------------+--------------+------+ 6 rows in set (0.00 sec) > =E5=9C=A8 2020=E5=B9=B44=E6=9C=8823=E6=97=A5=EF=BC=8C10:48=EF=BC=8C11013= 00123 =E5=86=99=E9=81=93=EF=BC=9A >=20 >=20 >=20 > =E6=88=91=E7=BB=99=E4=BD=A0=E4=B8=80=E4=BA=9B=E6=95=B0=E6=8D=AE=E5=92=8C= =E4=BB=A3=E7=A0=81=E5=90=A7!=E5=92=8C=E6=88=91=E7=9C=9F=E5=AE=9E=E5=9C=BA=E6= =99=AF=E9=94=99=E8=AF=AF=E4=B8=80=E6=A0=B7 > =E8=AE=A2=E5=8D=95=E4=B8=BB=E8=A1=A8=EF=BC=9Aorders > 13=E7=82=B9=E4=B8=A4=E6=9D=A1=E8=AE=B0=E5=BD=95=EF=BC=9Border_state=E6=98= =AF=E7=8A=B6=E6=80=81 0=E5=8F=96=E6=B6=88 1=E5=BE=85=E6=94=AF=E4=BB=98 > = {"order_no":"order1","order_state":1,"pay_time":"","create_time":"2020-04-= 01 13:00:00","update_time":"2020-04-01 13:00:00"} > = {"order_no":"order2","order_state":1,"pay_time":"","create_time":"2020-04-= 01 13:00:00","update_time":"2020-04-01 13:00:00"} >=20 >=20 > 13:15 > =E6=9D=A5=E4=BA=86=E4=B8=80=E6=9D=A1=E6=96=B0=E7=9A=84=E8=AE=B0=E5=BD=95= =E5=8F=96=E6=B6=88=E8=AE=A2=E5=8D=95 > = {"order_no":"order1","order_state":0,"pay_time":"","create_time":"2020-04-= 01 13:00:00","update_time":"2020-04-01 13:15:00"} >=20 >=20 > =E8=AE=A2=E5=8D=95=E6=98=8E=E7=BB=86=E8=A1=A8=EF=BC=9Aorder_detail > 4=E6=9D=A1=E8=AE=B0=E5=BD=95 > = {"order_no":"order1","product_code":"product1","quantity":3,"create_time":= "2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} > = {"order_no":"order1","product_code":"product2","quantity":5,"create_time":= "2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} > = {"order_no":"order2","product_code":"product1","quantity":2,"create_time":= "2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} > = {"order_no":"order2","product_code":"product2","quantity":4,"create_time":= "2020-04-01 13:00:00","update_time":"2020-04-01 13:00:00"} >=20 >=20 > = =E9=9C=80=E6=B1=82=E7=9A=84=E8=A6=81=E6=B1=82=E6=98=AF=E5=BD=93=E8=AE=A2=E5= =8D=95=E5=88=9B=E5=BB=BA=E5=90=8E=E6=88=91=E4=BB=AC=E5=B0=B1=E8=A6=81=E7=BB= =9F=E8=AE=A1=E8=AF=A5=E8=AE=A2=E5=8D=95=E5=AF=B9=E5=BA=94=E7=9A=84=E5=95=86= =E5=93=81=E6=95=B0=E9=87=8F=EF=BC=8C=E8=80=8C=E5=BD=93=E8=AE=A2=E5=8D=95=E7= =8A=B6=E6=80=81=E5=8F=98=E4=B8=BA=E5=8F=96=E6=B6=88=E6=97=B6=E6=88=91=E4=BB= =AC=E8=A6=81=E5=87=8F=E6=8E=89=E8=AF=A5=E8=AE=A2=E5=8D=95=E5=AF=B9=E5=BA=94= =E7=9A=84=E5=95=86=E5=93=81=E6=95=B0=E9=87=8F=E3=80=82 >=20 >=20 > =E4=BB=A3=E7=A0=81 > package Learn.kafkasql; >=20 > import = org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.types.Row; >=20 > public class SqlCount { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = =3DStreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > EnvironmentSettings settings =3D = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().buil= d(); > StreamTableEnvironment tenv =3D = StreamTableEnvironment.create(env,settings); >=20 > tenv.sqlUpdate("CREATE TABLE orders " + > " (" + > " order_no string," + > " order_state int," + > " pay_time string," + > " create_time string," + > " update_time string" + > " ) " + > " WITH (" + > " 'connector.type' =3D 'kafka', " + > " 'connector.version' =3D 'universal', " = +//--kafka=E7=89=88=E6=9C=AC > " 'connector.topic' =3D 'tp_orders'," +//--kafkatopic > " 'connector.properties.zookeeper.connect' =3D = '192.168.179.120:2181', " + > " 'connector.properties.bootstrap.servers' =3D = '192.168.179.120:9092'," + > " 'connector.properties.group.id' =3D 'testGroup'," + > " 'connector.startup-mode' =3D 'latest-offset'," + > " 'format.type' =3D 'json' " +//--=E6=95=B0=E6=8D=AE=E4=B8= =BAjson=E6=A0=BC=E5=BC=8F > " )"); > tenv.sqlUpdate("CREATE TABLE order_detail " + > " (" + > " order_no string," + > " product_code string," + > " quantity int," + > " create_time string," + > " update_time string" + > " ) " + > " WITH (" + > " 'connector.type' =3D 'kafka', " + > " 'connector.version' =3D 'universal', " = +//--kafka=E7=89=88=E6=9C=AC > " 'connector.topic' =3D 'tp_order_detail'," = +//--kafkatopic > " 'connector.properties.zookeeper.connect' =3D = '192.168.179.120:2181', " + > " 'connector.properties.bootstrap.servers' =3D = '192.168.179.120:9092'," + > " 'connector.properties.group.id' =3D 'testGroup'," + > " 'connector.startup-mode' =3D 'latest-offset'," + > " 'format.type' =3D 'json' " +//--=E6=95=B0=E6=8D=AE=E4=B8= =BAjson=E6=A0=BC=E5=BC=8F > " )"); >=20 > tenv.sqlUpdate("CREATE TABLE product_sale" + > " (" + > " order_date string," + > " product_code string," + > " cnt int" + > " ) " + > " WITH (" + > " 'connector.type' =3D 'jdbc', " + > " 'connector.url' =3D = 'jdbc:mysql://192.168.179.120:3306/flink?serverTimezone=3DUTC&useSSL=3Dtru= e', " + > " 'connector.table' =3D 'order_state_cnt', " + > " 'connector.driver' =3D 'com.mysql.jdbc.Driver', " + > " 'connector.username' =3D 'root'," + > " 'connector.password' =3D '123456'," + > " 'connector.write.flush.max-rows' =3D '1'," = +//--=E9=BB=98=E8=AE=A4=E6=AF=8F5000=E6=9D=A1=E6=95=B0=E6=8D=AE=E5=86=99=E5= =85=A5=E4=B8=80=E6=AC=A1=EF=BC=8C=E6=B5=8B=E8=AF=95=E8=B0=83=E5=B0=8F=E4=B8= =80=E7=82=B9 > " 'connector.write.flush.interval' =3D '2s'," = +//--=E5=86=99=E5=85=A5=E6=97=B6=E9=97=B4=E9=97=B4=E9=9A=94 > " 'connector.write.max-retries' =3D '3'" + > " )"); > tenv.sqlUpdate("insert into product_sale " + > "select create_date,product_code,sum(quantity)" + > "from (select t1.order_no," + > " t1.create_date," + > " t2.product_code," + > " t2.quantity" + > " from (select order_no," + > " order_state," + > " substring(create_time,1,10) create_date," + > " update_time ," + > " row_number() over(partition by order_no order by = update_time desc) as rn" + > " from orders" + > " )t1" + > " left join order_detail t2" + > " on t1.order_no=3Dt2.order_no" + > " where t1.rn=3D1" +//--=E5=8F=96=E6=9C=80=E6=96=B0=E7=9A=84=E8=AE=A2= =E5=8D=95=E7=8A=B6=E6=80=81=E6=95=B0=E6=8D=AE > " and t1.order_state<>0" +//--=E4=B8=8D=E5=8C=85=E5=90=AB=E5=8F=96=E6= =B6=88=E8=AE=A2=E5=8D=95 > " )t3" + > " group by create_date,product_code"); >=20 > Table table =3D tenv.sqlQuery("select = create_date,product_code,sum(quantity)" + > "from (select t1.order_no," + > " t1.create_date," + > " t2.product_code," + > " t2.quantity" + > " from (select order_no," + > " order_state," + > " substring(create_time,1,10) create_date," + > " update_time ," + > " row_number() over(partition by order_no order by = update_time desc) as rn" + > " from orders" + > " )t1" + > " left join order_detail t2" + > " on t1.order_no=3Dt2.order_no" + > " where t1.rn=3D1" + > " and t1.order_state<>0" + > " )t3" + > " group by create_date,product_code"); > tenv.toRetractStream(table, Row.class).print(); > tenv.execute("count"); > } > } > mysql =E5=BB=BA=E8=A1=A8=E8=AF=AD=E5=8F=A5 > CREATE TABLE `order_state_cnt` ( > `order_date` varchar(12) , > `product_code` varchar(12) , > `cnt` int=20 > ) ENGINE=3DInnoDB DEFAULT CHARSET=3Dutf8 >=20 >=20 > =E4=BD=BF=E7=94=A8=E7=9A=84=E6=98=AFkafka=E5=91=BD=E4=BB=A4=E8=A1=8C=E4=B8= =80=E6=9D=A1=E6=9D=A1=E5=8F=91=E9=80=81=E6=95=B0=E6=8D=AE=E7=9A=84=E6=96=B9= =E5=BC=8F >=20 >=20 > =E4=B8=BB=E8=A6=81=E6=98=AFdeleteStatement.executeBatch();=E8=BF=99=E4=B8= =AA=E6=96=B9=E6=B3=95=E6=8A=A5=E9=94=99 > @Override > public void executeBatch() throws SQLException { > if (keyToRows.size() > 0) { > for (Map.Entry> entry : = keyToRows.entrySet()) { > Row pk =3D entry.getKey(); > Tuple2 tuple =3D entry.getValue(); > if (tuple.f0) { > processOneRowInBatch(pk, tuple.f1); > } else { > setRecordToStatement(deleteStatement, pkTypes, pk); > deleteStatement.addBatch(); > } > } > internalExecuteBatch(); > deleteStatement.executeBatch(); > keyToRows.clear(); > } > } > =E5=9C=A82020=E5=B9=B44=E6=9C=8823=E6=97=A5 00:21=EF=BC=8CLeonard = Xu =E5=86=99=E9=81=93=EF=BC=9A > =E8=B5=9E=E8=AF=A6=E7=BB=86=E7=9A=84=E5=88=86=E6=9E=90! >=20 > = =E6=B2=A1=E8=83=BD=E5=A4=8D=E7=8E=B0=E4=BD=A0=E8=AF=B4=E7=9A=84=E9=97=AE=E9= =A2=98=EF=BC=8C=E6=9C=80=E5=90=8E=E4=B8=80=E6=AD=A5=E7=9A=84=E5=88=86=E6=9E= =90=E5=BA=94=E8=AF=A5=E6=9C=89=E7=82=B9=E5=B0=8F=E9=97=AE=E9=A2=98=EF=BC=8C= =E6=88=91=E7=9C=8B=E4=B8=8B=E4=BA=86jdbc mysql=E7=9A=84=E5=AE=9E=E7=8E=B0 > com/mysql/jdbc/PreparedStatement.java#executeBatchInternal() 1289=E8=A1=8C= > = =E6=98=AF=E4=BC=9A=E5=88=A4=E6=96=ADbatchedArgs=E6=95=B0=E7=BB=84=E7=9A=84= =E5=A4=A7=E5=B0=8F=E5=90=8E=E4=BC=9A=E7=9B=B4=E6=8E=A5=E8=BF=94=E5=9B=9E=E7= =9A=84=EF=BC=8C=E5=BA=94=E8=AF=A5=E4=B8=8D=E4=BC=9A=E6=89=A7=E8=A1=8C=EF=BC= =8C=E4=BD=A0=E5=8F=AF=E4=BB=A5=E8=BF=9B=E4=B8=80=E6=AD=A5=E8=B0=83=E8=AF=95= =E7=A1=AE=E8=AE=A4=E4=B8=8B > ``` > if (this.batchedArgs =3D=3D null || this.batchedArgs.size() =3D=3D 0) = { > return new long[0]; > } > ``` >=20 > =E7=A5=9D=E5=A5=BD=EF=BC=8C > Leonard Xu >=20 > =E5=9C=A8 2020=E5=B9=B44=E6=9C=8822=E6=97=A5=EF=BC=8C21:58=EF=BC=8C11013= 00123 =E5=86=99=E9=81=93=EF=BC=9A >=20 >=20 >=20 > =E6=88=91=E5=9C=A8SQL=E5=85=B3=E8=81=94=E5=90=8E=E6=8A=8A=E7=BB=93=E6=9E= =9C=E5=86=99=E5=85=A5mysql=E5=87=BA=E7=8E=B0 No value specified = for parameter 1=E9=94=99=E8=AF=AF=EF=BC=9F > =E6=88=91=E7=9A=84=E7=89=88=E6=9C=AC=E6=98=AF1.10.0=EF=BC=8C=E4=BB=A3=E7= =A0=81=E5=A6=82=E4=B8=8B > JDBCUpsertTableSink build =3D JDBCUpsertTableSink.builder() =20 > .setTableSchema(results.getSchema()) > .setOptions(JDBCOptions.builder() > = .setDBUrl("=E3=80=82=E3=80=82=E3=80=82=E3=80=82MultiQueries=3Dtrue&useUnic= ode=3Dtrue&characterEncoding=3DUTF-8") > .setDriverName("com.mysql.jdbc.Driver") > .setUsername("jczx_cjch") > .setPassword("jczx_cjch2") > .setTableName("xkf_join_result") > .build()) > .setFlushIntervalMills(1000) > .setFlushMaxSize(100) > .setMaxRetryTimes(3) > .build(); >=20 >=20 > DataStream> retract =3D = bsTableEnv.toRetractStream(results, Row.class); > retract.print(); > build.emitDataStream(retract); >=20 >=20 >=20 >=20 > =E5=B0=B1=E4=BC=9A=E5=87=BA=E7=8E=B0=E5=A6=82=E4=B8=8B=E9=94=99=E8=AF=AF= > java.sql.SQLException: No value specified for parameter 1 > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861) > at = com.mysql.jdbc.PreparedStatement.checkAllParametersSet(PreparedStatement.j= ava:2211) > at = com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:219= 1) > at = com.mysql.jdbc.PreparedStatement.fillSendPacket(PreparedStatement.java:212= 1) > at = com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1162) > at = org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertW= riter.java:118) > at = org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertO= utputFormat.java:159) > at = org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.snapshotState(JDB= CUpsertSinkFunction.java:56) > at = org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapsh= otFunctionState(StreamingFunctionUtils.java:118) > at = org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotF= unctionState(StreamingFunctionUtils.java:99) > at = org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapsho= tState(AbstractUdfStreamOperator.java:90) > at = org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotSt= ate(AbstractStreamOperator.java:402) > at = org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation= .checkpointStreamOperator(StreamTask.java:1420) > at = org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation= .executeCheckpointing(StreamTask.java:1354) >=20 >=20 >=20 >=20 > =E6=88=91=E7=9A=84=E8=BE=93=E5=87=BA=E6=95=B0=E6=8D=AE=E6=98=AF(true,202= 0-04-22 21:34:00,2020-04-22 = 21:34:15,20200422213541465568468)=E6=98=AF=E8=BF=99=E6=A0=B7=E7=9A=84 > =E6=88=91=E6=9F=A5=E7=9C=8B=E6=BA=90=E7=A0=81=E5=8F=91=E7=8E=B0 > = =E5=85=88=E8=B0=83=E7=94=A8JDBCUpsertOutputFormat=E7=B1=BB=E7=9A=84writeRe= cord=E6=96=B9=E6=B3=95=E7=BB=99UpsertWriter=E7=B1=BB=E7=9A=84=E6=88=90=E5=91= =98=E5=8F=98=E9=87=8Fmap=E4=B8=AD=E6=B7=BB=E5=8A=A0=E5=85=83=E7=B4=A0 > @Override > public synchronized void writeRecord(Tuple2 tuple2) = throws IOException { > checkFlushException(); > try { > jdbcWriter.addRecord(tuple2); > batchCount++; > if (batchCount >=3D flushMaxSize) { > flush(); > } > } catch (Exception e) { > throw new RuntimeException("Writing records to JDBC failed.", e); > } > } > = =E4=B9=8B=E5=90=8E=E8=B0=83=E7=94=A8flush()=E6=96=B9=E6=B3=95=EF=BC=8C=E8=B0= =83=E7=94=A8UpsertWriter=E7=B1=BB=E6=89=A7=E8=A1=8CexecuteBatch=E6=96=B9=E6= =B3=95 > public synchronized void flush() throws Exception { > checkFlushException(); > for (int i =3D 1; i <=3D maxRetryTimes; i++) { > try { > jdbcWriter.executeBatch(); > batchCount =3D 0; > break; > } catch (SQLException e) { > LOG.error("JDBC executeBatch error, retry times =3D {}", i, e); > if (i >=3D maxRetryTimes) { > throw e; > } > Thread.sleep(1000 * i); > } > } > } >=20 >=20 > =E7=84=B6=E5=90=8E=E4=BC=9A=E8=B0=83=E7=94=A8UpsertWriter=E7=B1=BB = =E5=AE=9E=E7=8E=B0JDBCWriter=E7=B1=BB=E5=9C=A8executeBatch=E6=96=B9=E6=B3=95= =E4=B8=AD=E5=85=88=E5=88=A4=E6=96=ADmap=E6=98=AF=E5=90=A6=E4=B8=BA=E7=A9=BA= =EF=BC=8C=E7=84=B6=E5=90=8E=E5=BE=AA=E7=8E=AFmap=EF=BC=9B=E4=B9=8B=E5=90=8E= =E5=88=A4=E6=96=AD2=E5=85=83=E7=BB=84=E7=AC=AC=E4=B8=80=E4=B8=AA=E5=85=83=E7= =B4=A0=E7=9A=84true=E8=B0=83=E7=94=A8=E5=86=85=E9=83=A8=E7=B1=BB=E5=A4=84=E7= =90=86=E5=85=83=E7=B4=A0=EF=BC=8C=E5=90=A6=E5=88=99=E5=88=A0=E9=99=A4=E6=95= =B0=E6=8D=AE=EF=BC=9B=E4=BD=86=E6=98=AF=E6=88=91=E6=AF=8F=E6=AC=A1=E7=9A=84= =E6=95=B0=E6=8D=AE=E5=8F=AA=E6=9C=89=E4=B8=80=E6=9D=A1=E5=B0=B1=E6=98=AFma= p=E7=9A=84=E5=A4=A7=E5=B0=8F=E6=98=AF1=EF=BC=8C=E4=B8=942=E5=85=83=E7=BB=84= =E7=9A=84=E7=AC=AC=E4=B8=80=E4=B8=AA=E5=85=83=E7=B4=A0=E5=80=BC=E6=98=AFtr= ue=EF=BC=8C=E5=BE=AA=E7=8E=AF=E7=BB=93=E6=9D=9F=E6=89=A7=E8=A1=8C = deleteStatement.executeBatch();=E6=96=B9=E6=B3=95=E5=B0=B1=E4=BC=9A=E5=87=BA= =E9=94=99=EF=BC=8C=E5=9B=A0=E4=B8=BA=E5=88=A0=E9=99=A4=E7=9A=84=E8=AF=AD=E5= =8F=A5=E7=AB=99=E4=BD=8D=E7=AC=A6=E8=BF=98=E6=B2=A1=E6=9C=89=E5=A1=AB=E5=85= =85=EF=BC=9B >=20 >=20 > @Override > public void executeBatch() throws SQLException { > if (keyToRows.size() > 0) { > for (Map.Entry> entry : = keyToRows.entrySet()) { > Row pk =3D entry.getKey(); > Tuple2 tuple =3D entry.getValue(); > if (tuple.f0) { > processOneRowInBatch(pk, tuple.f1); > } else { > setRecordToStatement(deleteStatement, pkTypes, pk); > deleteStatement.addBatch(); > } > } > internalExecuteBatch(); > deleteStatement.executeBatch(); > keyToRows.clear(); > } > }