flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: [jira] [Created] (FLINK-8356) JDBCAppendTableSink does not work for Hbase Phoenix Driver
Date Wed, 03 Jan 2018 21:54:00 GMT
I had a similar problem with batch API...the problem is that you have to
enable autocommit in the connection URL. Thr jdbc connector should better
handle this specific case as well (IMHO).

See https://issues.apache.org/jira/browse/FLINK-7605

On 3 Jan 2018 22:25, "Paul Wu (JIRA)" <jira@apache.org> wrote:

> Paul Wu created FLINK-8356:
> ------------------------------
>
>              Summary: JDBCAppendTableSink does not work for Hbase Phoenix
> Driver
>                  Key: FLINK-8356
>                  URL: https://issues.apache.org/jira/browse/FLINK-8356
>              Project: Flink
>           Issue Type: Bug
>           Components: Table API & SQL
>     Affects Versions: 1.4.0
>             Reporter: Paul Wu
>
>
> The following code runs without errors, but the data is not inserted into
> the HBase table. However, it does work for MySQL (see the commented out
> code). The Phoenix driver is from https://mvnrepository.com/
> artifact/org.apache.phoenix/phoenix/4.7.0-HBase-1.1
>
> String query = "select CURRENT_DATE SEGMENTSTARTTIME, CURRENT_DATE
> SEGMENTENDTIME, cast (imsi as varchar) imsi, cast(imei as varchar) imei
> from ts ";
>
>         Table table = ste.sqlQuery(query);
>         JDBCAppendTableSinkBuilder jdbc = JDBCAppendTableSink.builder();
>         jdbc.setDrivername("org.apache.phoenix.jdbc.PhoenixDriver");
>         jdbc.setDBUrl("jdbc:phoenix:hosts:2181:/hbase-unsecure");
>         jdbc.setQuery("upsert INTO GEO_ANALYTICS_STREAMING_DATA
> (SEGMENTSTARTTIME,SEGMENTENDTIME, imsi, imei) values (?,?,?, ?)");
> //     JDBCAppendTableSinkBuilder jdbc = JDBCAppendTableSink.builder();
> //        jdbc.setDrivername("com.mysql.jdbc.Driver");
> //        jdbc.setDBUrl("jdbc:mysql://localhost/test");
> //        jdbc.setUsername("root").setPassword("");
> //        jdbc.setQuery("insert INTO GEO_ANALYTICS_STREAMING_DATA
> (SEGMENTSTARTTIME,SEGMENTENDTIME, imsi, imei) values (?,?,?, ?)");
> //        jdbc.setBatchSize(1);
>         jdbc.setParameterTypes(Types.SQL_DATE, Types.SQL_DATE,
> Types.STRING, Types.STRING);
>         JDBCAppendTableSink sink = jdbc.build();
>         table.writeToSink(sink);
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.4.14#64029)
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message