flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Apache Phenix integration
Date Wed, 06 Sep 2017 14:24:12 GMT
Maybe this should be well documented also...is there any dedicated page to
Flink and JDBC connectors?

On Wed, Sep 6, 2017 at 4:12 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> Great!
>
> If you want to, you can open a PR that adds
>
> if (!conn.getAutoCommit()) {
>   conn.setAutoCommit(true);
> }
>
> to JdbcOutputFormat.open().
>
> Cheers, Fabian
>
>
>
> 2017-09-06 15:55 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>
>> Hi Fabian,
>> thanks for the detailed answer. Obviously you are right :)
>> As stated by https://phoenix.apache.org/tuning.html auto-commit is
>> disabled by default in Phoenix, but it can be easily enabled just appending
>> AutoCommit=true to the connection URL or, equivalently, setting the proper
>> property in the conf object passed to the Phoenix
>> QueryUtil.getConnectionUrl method that autogenerate the connection URL,
>> i.e.:
>>
>> ----------------------
>> Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
>> jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
>> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
>> final Properties phoenixProps = PropertiesUtil.extractProperties(new
>> Properties(), jobConf);
>> String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>> ----------------------
>>
>> Now my job works also with the standard Flink JDBCOutputformat.
>> Just to help other people willing to play with Phoenix and HBase I paste
>> below my simple test job:
>>
>> @Test
>>   public void testPhoenixOutputFormat() throws Exception {
>>
>>     final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
>>     senv.enableCheckpointing(5000);
>>     DataStream<String> testStream = senv.fromElements("1,aaa,XXX",
>> "2,bbb,YYY", "3,ccc,ZZZ");
>>
>>     // Set the target Phoenix table and the columns
>>     DataStream<Row> rows = testStream.map(new MapFunction<String, Row>()
{
>>
>>       private static final long serialVersionUID = 1L;
>>
>>       @Override
>>       public Row map(String str) throws Exception {
>>         String[] split = str.split(Pattern.quote(","));
>>         Row ret = new Row(3);
>>         ret.setField(0, split[0]);
>>         ret.setField(1, split[1]);
>>         ret.setField(2, split[2]);
>>         return ret;
>>       }
>>     }).returns(new RowTypeInfo(BasicTypeInfo.STRI
>> NG_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_
>> INFO));
>>
>>     Job job = Job.getInstance(HBaseConfiguration.create(),
>> "phoenix-mr-job");
>>     PhoenixMapReduceUtil.setOutput(job, "MY_TABLE",
>> "FIELD_1,FIELD2,FIELD_3");
>>     final org.apache.hadoop.conf.Configuration jobConf =
>> job.getConfiguration();
>>     jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
>> PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
>>     final String upsertStatement = PhoenixConfigurationUtil.getUp
>> sertStatement(jobConf);
>>     final Properties phoenixProps = PropertiesUtil.extractProperties(new
>> Properties(), jobConf);
>>     String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
>>
>>     rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
>>         .setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.class.
>> getCanonicalName())
>>         .setDBUrl(connUrl)
>>         .setQuery(upsertStatement)
>>         .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR,
>> Types.VARCHAR})
>>         .finish());
>>
>>     senv.execute();
>>   }
>>
>> Best,
>> Flavio
>>
>> On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> According to the JavaDocs of java.sql.Connection, commit() will throw an
>>> exception if the connection is in auto commit mode which should be the
>>> default.
>>> So adding this change to the JdbcOutputFormat seems a bit risky.
>>>
>>> Maybe the Phoenix JDBC connector does not enable auto commits by default
>>> (or doesn't support it). Can you check that Flavio?
>>> If the Phoenix connector supports but not activates auto commits by
>>> default, we can enable it in JdbcOutputFormat.open().
>>> If auto commits are not supported, we can add a check after execute()
>>> and call commit() only if Connection.getAutoCommit() returns false.
>>>
>>> Best, Fabian
>>>
>>>
>>> 2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>
>>>> Hi to all,
>>>> I'm writing a job that uses Apache Phoenix.
>>>>
>>>> At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but
>>>> it's not well suited to work with Table API because it cannot handle
>>>> generic objects like Rows (it need a DBWritable Object that should be
>>>> already present at compile time). So I've looked into the code of the
>>>> PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
>>>> (basically).
>>>>
>>>> However, to make it work I had to slightly modify the Flink
>>>> JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
>>>> PreparedStatement. E.g:
>>>>
>>>>     upload.executeBatch();
>>>>     dbConn.commit();
>>>>
>>>> For the moment I've just created a dedicated PhoenixJdbcOutpuFormat
>>>> where I've added these 2 lines of code starting from the code of the
>>>> JDBCOutputformat (it couldn't be extended in this case because all fields
>>>> are private).
>>>>
>>>> What do you think about this? Should I open a ticket to add a
>>>> connection commit after executeBatch (in order to be compatible with
>>>> Phoenix) or something else (e.g. create a Phoenix connector that basically
>>>> extend JDBCOutputConnector and ovewrite 2 methods, changing also the
>>>> visibility of its fields to protected)?
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>>
>>>
>>
>>
>

Mime
View raw message