flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: PotsgreSQL JDBC Sink, "writeRecord() failed" and "Batch element cancelled" on upsert
Date Thu, 15 Jan 2015 08:54:26 GMT
Hi,

sorry for the late reply. I'm not an postgresSql expert, but I try my
best.  Also, I'm not aware of many users of the JDBC output format, so I
guess its quite likely that there are open issues with it.

The exceptions have been thrown by the PostgreSQL JDBC driver.
As far as I can see, there are two exceptions, one in the "writeRecord()"
method call and one in the close() call. I think we can ignore the
exception in the close() call because its somewhat expected that we can not
execute further queries when parts of the batch have failed.
I suspect that the batched inserts performed by the OutputFormat are not
compatible with upsert function. (Have a look at the code to see how we do
batched inserts:
https://github.com/apache/flink/blob/b904b0041cf97b2c6181b1985afc457ed01cf626/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
)

I would recommend you to prototype a new OutputFormat which is performing
regular queries to see if this is working with upserts.

Robert





On Wed, Jan 14, 2015 at 1:16 PM, Benoît Hanotte <
benoit.jc.hanotte@campus.tu-berlin.de> wrote:

> Hi,
>
> I'm trying to use a spatial PostgreSQL with PostGIS database as a Sink for
> my data.
> It works perfectly when performing an INSERT with my data, but when trying
> to perform an UPSERT it fails with the following errors:
>
> 15/01/14 12:52:08 ERROR operators.DataSinkTask: Error in user code:
> writeRecord() failed:  DataSink(org.apache.flink.api.
> java.io.jdbc.JDBCOutputFormat@77a8fb80) (4/4)
> java.lang.IllegalArgumentException: writeRecord() failed
>         at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.
> writeRecord(JDBCOutputFormat.java:132)
>         at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.
> writeRecord(JDBCOutputFormat.java:41)
>         at org.apache.flink.runtime.operators.DataSinkTask.invoke(
> DataSinkTask.java:173)
>         at org.apache.flink.runtime.execution.RuntimeEnvironment.
> run(RuntimeEnvironment.java:235)
>         at java.lang.Thread.run(Thread.java:744)
> Caused by: java.sql.BatchUpdateException: L'élément du batch 0 SELECT
> upsert_nsp_db(-42.48112678527832,-73.76546859741211,8.0,0.0) a été
> annulé. Appeler getNextException pour en connaître la cause.
>         at org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.
> handleError(AbstractJdbc2Statement.java:2743)
>         at org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.
> handleResultRows(AbstractJdbc2Statement.java:2692)
>         at org.postgresql.core.v3.QueryExecutorImpl$
> ErrorTrackingResultHandler.handleResultRows(QueryExecutorImpl.java:333)
>         at org.postgresql.core.v3.QueryExecutorImpl.processResults(
> QueryExecutorImpl.java:1853)
>         at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(
> QueryExecutorImpl.java:1130)
>         at org.postgresql.core.v3.QueryExecutorImpl.execute(
> QueryExecutorImpl.java:396)
>         at org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch(
> AbstractJdbc2Statement.java:2892)
>         at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.
> writeRecord(JDBCOutputFormat.java:127)
>         ... 4 more
> 15/01/14 12:52:08 WARN operators.DataSinkTask: Error closing the ouput
> format.:  DataSink(org.apache.flink.api.java.io.jdbc.JDBCOutputFormat@77a8fb80)
> (4/4)
> java.lang.IllegalArgumentException: close() failed
>         at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.close(
> JDBCOutputFormat.java:188)
>         at org.apache.flink.runtime.operators.DataSinkTask.invoke(
> DataSinkTask.java:202)
>         at org.apache.flink.runtime.execution.RuntimeEnvironment.
> run(RuntimeEnvironment.java:235)
>         at java.lang.Thread.run(Thread.java:744)
> Caused by: org.postgresql.util.PSQLException: Ce statement a été fermé.
> (Translates into This statement has been closed)
>         at org.postgresql.jdbc2.AbstractJdbc2Statement.checkClosed(
> AbstractJdbc2Statement.java:2634)
>         at org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch(
> AbstractJdbc2Statement.java:2832)
>         at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.close(
> JDBCOutputFormat.java:185)
>         ... 3 more
>
>
>
> My java code is
>
>
> var.output(
>         // build and configure OutputFormat
>         JDBCOutputFormat
>                 .buildJDBCOutputFormat()
>                 .setDrivername("org.postgresql.Driver")
>                 .setDBUrl("jdbc:postgresql://127.0.0.1:5432/test")
>                 .setUsername("postgres")
>                 .setPassword("")
>                 .setQuery("SELECT upsert_nsp_db(?,?,?);") // lat, lng, val
>                 .finish()
>         );
>
>
>
> My upsert function is the following:
>
>
> CREATE FUNCTION upsert_db(lat DOUBLE PRECISION, lng DOUBLE PRECISION, m
> FLOAT) RETURNS VOID AS
> $$
> BEGIN
>     LOOP
>         -- first try to update the key
>         UPDATE nsp_db SET mean = m WHERE ST_Equals(location,
> ST_SetSRID(ST_MakePoint(lng,lat),26918));
>         IF found THEN
>             RETURN;
>         END IF;
>         -- not there, so try to insert the key
>         -- if someone else inserts the same key concurrently,
>         -- we could get a unique-key failure
>         BEGIN
>                         INSERT INTO nsp_db (location, mean) VALUES
> (ST_SetSRID(ST_MakePoint(lng,lat),26918), m);
>             RETURN;
>         EXCEPTION WHEN unique_violation THEN
>             -- do nothing, and loop to try the UPDATE again
>         END;
>     END LOOP;
> END;
> $$
> LANGUAGE plpgsql;
>
>
> I suspect it is some kind of timeout, but is there any way I could solve
> this issue?
>
> Thanks!
>

Mime
View raw message