Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ABCDB17F9E for ; Wed, 14 Jan 2015 12:15:18 +0000 (UTC) Received: (qmail 45118 invoked by uid 500); 14 Jan 2015 12:15:20 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 45051 invoked by uid 500); 14 Jan 2015 12:15:20 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 45041 invoked by uid 99); 14 Jan 2015 12:15:20 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Jan 2015 12:15:20 +0000 X-ASF-Spam-Status: No, hits=-0.7 required=5.0 tests=RCVD_IN_DNSWL_LOW X-Spam-Check-By: apache.org Received-SPF: error (athena.apache.org: local policy) Received: from [212.27.42.1] (HELO smtp1-g21.free.fr) (212.27.42.1) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Jan 2015 12:15:13 +0000 Received: from vaio-sb (unknown [92.225.63.152]) (Authenticated sender: benoit.hanotte) by smtp1-g21.free.fr (Postfix) with ESMTPSA id BCC78940060 for ; Wed, 14 Jan 2015 13:12:42 +0100 (CET) Content-Type: text/plain; charset=iso-8859-15; format=flowed; delsp=yes To: user@flink.apache.org Date: Wed, 14 Jan 2015 13:16:02 +0100 Subject: PotsgreSQL JDBC Sink, "writeRecord() failed" and "Batch element cancelled" on upsert MIME-Version: 1.0 Content-Transfer-Encoding: Quoted-Printable From: =?iso-8859-15?Q?Beno=EEt_Hanotte?= Message-ID: User-Agent: Opera Mail/1.0 (Win32) X-Antivirus: avast! (VPS 150113-2, 13/01/2015), Outbound message X-Antivirus-Status: Clean X-Virus-Checked: Checked by ClamAV on apache.org Hi, I'm trying to use a spatial PostgreSQL with PostGIS database as a Sink f= or = my data. It works perfectly when performing an INSERT with my data, but when tryi= ng = to perform an UPSERT it fails with the following errors: 15/01/14 12:52:08 ERROR operators.DataSinkTask: Error in user code: = writeRecord() failed: = DataSink(org.apache.flink.api.java.io.jdbc.JDBCOutputFormat@77a8fb80) (4= /4) java.lang.IllegalArgumentException: writeRecord() failed at = org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutpu= tFormat.java:132) at = org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutpu= tFormat.java:41) at = org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java= :173) at = org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnviron= ment.java:235) at java.lang.Thread.run(Thread.java:744) Caused by: java.sql.BatchUpdateException: L'=E9l=E9ment du batch 0 SELEC= T = upsert_nsp_db(-42.48112678527832,-73.76546859741211,8.0,0.0) a =E9t=E9 a= nnul=E9. = Appeler getNextException pour en conna=EEtre la cause. at = org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.handleErr= or(AbstractJdbc2Statement.java:2743) at = org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.handleRes= ultRows(AbstractJdbc2Statement.java:2692) at = org.postgresql.core.v3.QueryExecutorImpl$ErrorTrackingResultHandler.hand= leResultRows(QueryExecutorImpl.java:333) at = org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImp= l.java:1853) at = org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.jav= a:1130) at = org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:= 396) at = org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch(AbstractJdbc2St= atement.java:2892) at = org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutpu= tFormat.java:127) ... 4 more 15/01/14 12:52:08 WARN operators.DataSinkTask: Error closing the ouput = format.: = DataSink(org.apache.flink.api.java.io.jdbc.JDBCOutputFormat@77a8fb80) (4= /4) java.lang.IllegalArgumentException: close() failed at = org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.close(JDBCOutputForma= t.java:188) at = org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java= :202) at = org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnviron= ment.java:235) at java.lang.Thread.run(Thread.java:744) Caused by: org.postgresql.util.PSQLException: Ce statement a =E9t=E9 fer= m=E9. = (Translates into This statement has been closed) at = org.postgresql.jdbc2.AbstractJdbc2Statement.checkClosed(AbstractJdbc2Sta= tement.java:2634) at = org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch(AbstractJdbc2St= atement.java:2832) at = org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.close(JDBCOutputForma= t.java:185) ... 3 more My java code is var.output( // build and configure OutputFormat JDBCOutputFormat .buildJDBCOutputFormat() .setDrivername("org.postgresql.Driver") .setDBUrl("jdbc:postgresql://127.0.0.1:5432/test") .setUsername("postgres") .setPassword("") .setQuery("SELECT upsert_nsp_db(?,?,?);") // lat, lng, val .finish() ); My upsert function is the following: CREATE FUNCTION upsert_db(lat DOUBLE PRECISION, lng DOUBLE PRECISION, m = = FLOAT) RETURNS VOID AS $$ BEGIN LOOP -- first try to update the key UPDATE nsp_db SET mean =3D m WHERE ST_Equals(location, = ST_SetSRID(ST_MakePoint(lng,lat),26918)); IF found THEN RETURN; END IF; -- not there, so try to insert the key -- if someone else inserts the same key concurrently, -- we could get a unique-key failure BEGIN INSERT INTO nsp_db (location, mean) VALUES = (ST_SetSRID(ST_MakePoint(lng,lat),26918), m); RETURN; EXCEPTION WHEN unique_violation THEN -- do nothing, and loop to try the UPDATE again END; END LOOP; END; $$ LANGUAGE plpgsql; I suspect it is some kind of timeout, but is there any way I could solve= = this issue? Thanks!