Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 83AE41791A for ; Thu, 15 Jan 2015 08:54:47 +0000 (UTC) Received: (qmail 39849 invoked by uid 500); 15 Jan 2015 08:54:49 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 39773 invoked by uid 500); 15 Jan 2015 08:54:49 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 39764 invoked by uid 99); 15 Jan 2015 08:54:49 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Jan 2015 08:54:49 +0000 Received: from mail-la0-f47.google.com (mail-la0-f47.google.com [209.85.215.47]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 3CAD61A0041 for ; Thu, 15 Jan 2015 08:54:47 +0000 (UTC) Received: by mail-la0-f47.google.com with SMTP id hz20so12419945lab.6 for ; Thu, 15 Jan 2015 00:54:46 -0800 (PST) X-Received: by 10.153.11.170 with SMTP id ej10mr8498973lad.24.1421312086652; Thu, 15 Jan 2015 00:54:46 -0800 (PST) MIME-Version: 1.0 Received: by 10.153.5.36 with HTTP; Thu, 15 Jan 2015 00:54:26 -0800 (PST) In-Reply-To: References: From: Robert Metzger Date: Thu, 15 Jan 2015 09:54:26 +0100 Message-ID: Subject: Re: PotsgreSQL JDBC Sink, "writeRecord() failed" and "Batch element cancelled" on upsert To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a113479140a4233050cad0231 --001a113479140a4233050cad0231 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi, sorry for the late reply. I'm not an postgresSql expert, but I try my best. Also, I'm not aware of many users of the JDBC output format, so I guess its quite likely that there are open issues with it. The exceptions have been thrown by the PostgreSQL JDBC driver. As far as I can see, there are two exceptions, one in the "writeRecord()" method call and one in the close() call. I think we can ignore the exception in the close() call because its somewhat expected that we can not execute further queries when parts of the batch have failed. I suspect that the batched inserts performed by the OutputFormat are not compatible with upsert function. (Have a look at the code to see how we do batched inserts: https://github.com/apache/flink/blob/b904b0041cf97b2c6181b1985afc457ed01cf6= 26/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/= JDBCOutputFormat.java ) I would recommend you to prototype a new OutputFormat which is performing regular queries to see if this is working with upserts. Robert On Wed, Jan 14, 2015 at 1:16 PM, Beno=C3=AEt Hanotte < benoit.jc.hanotte@campus.tu-berlin.de> wrote: > Hi, > > I'm trying to use a spatial PostgreSQL with PostGIS database as a Sink fo= r > my data. > It works perfectly when performing an INSERT with my data, but when tryin= g > to perform an UPSERT it fails with the following errors: > > 15/01/14 12:52:08 ERROR operators.DataSinkTask: Error in user code: > writeRecord() failed: DataSink(org.apache.flink.api. > java.io.jdbc.JDBCOutputFormat@77a8fb80) (4/4) > java.lang.IllegalArgumentException: writeRecord() failed > at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat. > writeRecord(JDBCOutputFormat.java:132) > at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat. > writeRecord(JDBCOutputFormat.java:41) > at org.apache.flink.runtime.operators.DataSinkTask.invoke( > DataSinkTask.java:173) > at org.apache.flink.runtime.execution.RuntimeEnvironment. > run(RuntimeEnvironment.java:235) > at java.lang.Thread.run(Thread.java:744) > Caused by: java.sql.BatchUpdateException: L'=C3=A9l=C3=A9ment du batch 0 = SELECT > upsert_nsp_db(-42.48112678527832,-73.76546859741211,8.0,0.0) a =C3=A9t=C3= =A9 > annul=C3=A9. Appeler getNextException pour en conna=C3=AEtre la cause. > at org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler= . > handleError(AbstractJdbc2Statement.java:2743) > at org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler= . > handleResultRows(AbstractJdbc2Statement.java:2692) > at org.postgresql.core.v3.QueryExecutorImpl$ > ErrorTrackingResultHandler.handleResultRows(QueryExecutorImpl.java:333) > at org.postgresql.core.v3.QueryExecutorImpl.processResults( > QueryExecutorImpl.java:1853) > at org.postgresql.core.v3.QueryExecutorImpl.sendQuery( > QueryExecutorImpl.java:1130) > at org.postgresql.core.v3.QueryExecutorImpl.execute( > QueryExecutorImpl.java:396) > at org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch( > AbstractJdbc2Statement.java:2892) > at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat. > writeRecord(JDBCOutputFormat.java:127) > ... 4 more > 15/01/14 12:52:08 WARN operators.DataSinkTask: Error closing the ouput > format.: DataSink(org.apache.flink.api.java.io.jdbc.JDBCOutputFormat@77a= 8fb80) > (4/4) > java.lang.IllegalArgumentException: close() failed > at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.close( > JDBCOutputFormat.java:188) > at org.apache.flink.runtime.operators.DataSinkTask.invoke( > DataSinkTask.java:202) > at org.apache.flink.runtime.execution.RuntimeEnvironment. > run(RuntimeEnvironment.java:235) > at java.lang.Thread.run(Thread.java:744) > Caused by: org.postgresql.util.PSQLException: Ce statement a =C3=A9t=C3= =A9 ferm=C3=A9. > (Translates into This statement has been closed) > at org.postgresql.jdbc2.AbstractJdbc2Statement.checkClosed( > AbstractJdbc2Statement.java:2634) > at org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch( > AbstractJdbc2Statement.java:2832) > at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.close( > JDBCOutputFormat.java:185) > ... 3 more > > > > My java code is > > > var.output( > // build and configure OutputFormat > JDBCOutputFormat > .buildJDBCOutputFormat() > .setDrivername("org.postgresql.Driver") > .setDBUrl("jdbc:postgresql://127.0.0.1:5432/test") > .setUsername("postgres") > .setPassword("") > .setQuery("SELECT upsert_nsp_db(?,?,?);") // lat, lng, va= l > .finish() > ); > > > > My upsert function is the following: > > > CREATE FUNCTION upsert_db(lat DOUBLE PRECISION, lng DOUBLE PRECISION, m > FLOAT) RETURNS VOID AS > $$ > BEGIN > LOOP > -- first try to update the key > UPDATE nsp_db SET mean =3D m WHERE ST_Equals(location, > ST_SetSRID(ST_MakePoint(lng,lat),26918)); > IF found THEN > RETURN; > END IF; > -- not there, so try to insert the key > -- if someone else inserts the same key concurrently, > -- we could get a unique-key failure > BEGIN > INSERT INTO nsp_db (location, mean) VALUES > (ST_SetSRID(ST_MakePoint(lng,lat),26918), m); > RETURN; > EXCEPTION WHEN unique_violation THEN > -- do nothing, and loop to try the UPDATE again > END; > END LOOP; > END; > $$ > LANGUAGE plpgsql; > > > I suspect it is some kind of timeout, but is there any way I could solve > this issue? > > Thanks! > --001a113479140a4233050cad0231 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi,

sorry for the late reply. I'm n= ot an postgresSql expert, but I try my best.=C2=A0 Also, I'm not aware = of many users of the JDBC output format, so I guess its quite likely that t= here are open issues with it.

The exceptions have = been thrown by the PostgreSQL JDBC driver.
As far as I can see, t= here are two exceptions, one in the "wr= iteRecord()" method call and one= in the close() call. I think we can ignore the exception in the close() ca= ll because its somewhat expected that we can not execute further queries wh= en parts of the batch have failed.
I suspect that the batched inserts performed by the OutputFormat ar= e not compatible with upsert function. (Have a look at the code to see how = we do batched inserts:=C2=A0https:= //github.com/apache/flink/blob/b904b0041cf97b2c6181b1985afc457ed01cf626/fli= nk-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOu= tputFormat.java)

I would recommend you to prot= otype a new OutputFormat which is performing regular queries to see if this= is working with upserts.

Robert




<= /span>

On Wed, Jan 14, 2015 at 1:16 PM, Beno=C3=AEt Hanotte <= ;benoit.jc.hanotte@campus.tu-berlin.de> wrote:
Hi,

I'm trying to use a spatial PostgreSQL with PostGIS database as a Sink = for my data.
It works perfectly when performing an INSERT with my data, but when trying = to perform an UPSERT it fails with the following errors:

15/01/14 12:52:08 ERROR operators.DataSinkTask: Error in user code: writeRe= cord() failed:=C2=A0 DataSink(org.apache.flink.api.java.io.jdbc.JDBC= OutputFormat@77a8fb80) (4/4)
java.lang.IllegalArgumentException: writeRecord() failed
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.api.java.io.jdbc.JDBCOutp= utFormat.writeRecord(JDBCOutputFormat.java:132)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.api.java.io.jdbc.JDBCOutp= utFormat.writeRecord(JDBCOutputFormat.java:41)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.operators.Da= taSinkTask.invoke(DataSinkTask.java:173)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.execution.Ru= ntimeEnvironment.run(RuntimeEnvironment.java:235)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Thread.run(Thread.java:744)=
Caused by: java.sql.BatchUpdateException: L'=C3=A9l=C3=A9ment du batch = 0 SELECT upsert_nsp_db(-42.48112678527832,-73.76546859741211,= 8.0,0.0) a =C3=A9t=C3=A9 annul=C3=A9. Appeler getNextException pour en conn= a=C3=AEtre la cause.
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.postgresql.jdbc2.AbstractJdbc2Sta= tement$BatchResultHandler.handleError(AbstractJdbc2Sta= tement.java:2743)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.postgresql.jdbc2.AbstractJdbc2Sta= tement$BatchResultHandler.handleResultRows(AbstractJdb= c2Statement.java:2692)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.postgresql.core.v3.QueryExecutorI= mpl$ErrorTrackingResultHandler.handleResultRows(QueryE= xecutorImpl.java:333)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.postgresql.core.v3.QueryExecutorI= mpl.processResults(QueryExecutorImpl.java:1853)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.postgresql.core.v3.QueryExecutorI= mpl.sendQuery(QueryExecutorImpl.java:1130)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.postgresql.core.v3.QueryExecutorI= mpl.execute(QueryExecutorImpl.java:396)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.postgresql.jdbc2.AbstractJdbc2Sta= tement.executeBatch(AbstractJdbc2Statement.java:2892)<= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.api.java.io.jdbc.JDBCOutp= utFormat.writeRecord(JDBCOutputFormat.java:127)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 4 more
15/01/14 12:52:08 WARN operators.DataSinkTask: Error closing the ouput form= at.:=C2=A0 DataSink(org.apache.flink.api.java.io.jdbc.JDBCOutputForm= at@77a8fb80) (4/4)
java.lang.IllegalArgumentException: close() failed
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.api.java.io.jdbc.JDBCOutp= utFormat.close(JDBCOutputFormat.java:188)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.operators.Da= taSinkTask.invoke(DataSinkTask.java:202)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.runtime.execution.Ru= ntimeEnvironment.run(RuntimeEnvironment.java:235)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.lang.Thread.run(Thread.java:744)=
Caused by: org.postgresql.util.PSQLException: Ce statement a =C3=A9t= =C3=A9 ferm=C3=A9. (Translates into This statement has been closed)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.postgresql.jdbc2.AbstractJdbc2Sta= tement.checkClosed(AbstractJdbc2Statement.java:2634) =C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.postgresql.jdbc2.AbstractJdbc2Sta= tement.executeBatch(AbstractJdbc2Statement.java:2832)<= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.flink.api.java.io.jdbc.JDBCOutp= utFormat.close(JDBCOutputFormat.java:185)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 ... 3 more



My java code is


var.output(
=C2=A0 =C2=A0 =C2=A0 =C2=A0 // build and configure OutputFormat
=C2=A0 =C2=A0 =C2=A0 =C2=A0 JDBCOutputFormat
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .buildJDBCOutputFor= mat()
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .setDrivername(&quo= t;org.postgresql.Driver")
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .setDBUrl("jdb= c:postgresql://127.0.0.1:5432/test")
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .setUsername("= postgres")
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .setPassword("= ")
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .setQuery("SEL= ECT upsert_nsp_db(?,?,?);") // lat, lng, val
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .finish()
=C2=A0 =C2=A0 =C2=A0 =C2=A0 );



My upsert function is the following:


CREATE FUNCTION upsert_db(lat DOUBLE PRECISION, lng DOUBLE PRECISION, m FLO= AT) RETURNS VOID AS
$$
BEGIN
=C2=A0 =C2=A0 LOOP
=C2=A0 =C2=A0 =C2=A0 =C2=A0 -- first try to update the key
=C2=A0 =C2=A0 =C2=A0 =C2=A0 UPDATE nsp_db SET mean =3D m WHERE ST_Equals(lo= cation, ST_SetSRID(ST_MakePoint(lng,lat),26918));
=C2=A0 =C2=A0 =C2=A0 =C2=A0 IF found THEN
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 RETURN;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 END IF;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 -- not there, so try to insert the key
=C2=A0 =C2=A0 =C2=A0 =C2=A0 -- if someone else inserts the same key concurr= ently,
=C2=A0 =C2=A0 =C2=A0 =C2=A0 -- we could get a unique-key failure
=C2=A0 =C2=A0 =C2=A0 =C2=A0 BEGIN
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 INSERT INTO nsp_db (location, mean) VALUES (ST_SetSRID(ST_MakePo= int(lng,lat),26918), m);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 RETURN;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 EXCEPTION WHEN unique_violation THEN
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 -- do nothing, and loop to try th= e UPDATE again
=C2=A0 =C2=A0 =C2=A0 =C2=A0 END;
=C2=A0 =C2=A0 END LOOP;
END;
$$
LANGUAGE plpgsql;


I suspect it is some kind of timeout, but is there any way I could solve th= is issue?

Thanks!

--001a113479140a4233050cad0231--