flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Apache Phenix integration
Date Wed, 06 Sep 2017 13:55:29 GMT
Hi Fabian,
thanks for the detailed answer. Obviously you are right :)
As stated by https://phoenix.apache.org/tuning.html auto-commit is disabled
by default in Phoenix, but it can be easily enabled just appending
AutoCommit=true to the connection URL or, equivalently, setting the proper
property in the conf object passed to the Phoenix
QueryUtil.getConnectionUrl method that autogenerate the connection URL,
i.e.:

----------------------
Job job = Job.getInstance(HBaseConfiguration.create(), "phoenix-mr-job");
jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
final Properties phoenixProps = PropertiesUtil.extractProperties(new
Properties(), jobConf);
String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);
----------------------

Now my job works also with the standard Flink JDBCOutputformat.
Just to help other people willing to play with Phoenix and HBase I paste
below my simple test job:

@Test
  public void testPhoenixOutputFormat() throws Exception {

    final StreamExecutionEnvironment senv = getStreamingExecutionEnv();
    senv.enableCheckpointing(5000);
    DataStream<String> testStream = senv.fromElements("1,aaa,XXX",
"2,bbb,YYY", "3,ccc,ZZZ");

    // Set the target Phoenix table and the columns
    DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() {

      private static final long serialVersionUID = 1L;

      @Override
      public Row map(String str) throws Exception {
        String[] split = str.split(Pattern.quote(","));
        Row ret = new Row(3);
        ret.setField(0, split[0]);
        ret.setField(1, split[1]);
        ret.setField(2, split[2]);
        return ret;
      }
    }).returns(new
RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO));

    Job job = Job.getInstance(HBaseConfiguration.create(),
"phoenix-mr-job");
    PhoenixMapReduceUtil.setOutput(job, "MY_TABLE",
"FIELD_1,FIELD2,FIELD_3");
    final org.apache.hadoop.conf.Configuration jobConf =
job.getConfiguration();
    jobConf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
PhoenixRuntime.AUTO_COMMIT_ATTRIB + "=true");
    final String upsertStatement =
PhoenixConfigurationUtil.getUpsertStatement(jobConf);
    final Properties phoenixProps = PropertiesUtil.extractProperties(new
Properties(), jobConf);
    String connUrl = QueryUtil.getConnectionUrl(phoenixProps, jobConf);

    rows.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()

.setDrivername(org.apache.phoenix.jdbc.PhoenixDriver.class.getCanonicalName())
        .setDBUrl(connUrl)
        .setQuery(upsertStatement)
        .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR, Types.VARCHAR})
        .finish());

    senv.execute();
  }

Best,
Flavio

On Wed, Sep 6, 2017 at 3:26 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi,
>
> According to the JavaDocs of java.sql.Connection, commit() will throw an
> exception if the connection is in auto commit mode which should be the
> default.
> So adding this change to the JdbcOutputFormat seems a bit risky.
>
> Maybe the Phoenix JDBC connector does not enable auto commits by default
> (or doesn't support it). Can you check that Flavio?
> If the Phoenix connector supports but not activates auto commits by
> default, we can enable it in JdbcOutputFormat.open().
> If auto commits are not supported, we can add a check after execute() and
> call commit() only if Connection.getAutoCommit() returns false.
>
> Best, Fabian
>
>
> 2017-09-06 11:38 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>
>> Hi to all,
>> I'm writing a job that uses Apache Phoenix.
>>
>> At first I used the PhoenixOutputFormat as (hadoop) OutputFormat but it's
>> not well suited to work with Table API because it cannot handle generic
>> objects like Rows (it need a DBWritable Object that should be already
>> present at compile time). So I've looked into the code of the
>> PhoenixOutputFormat and it's nothing else than a JDBCOutputFormat
>> (basically).
>>
>> However, to make it work I had to slightly modify the Flink
>> JDBCOutputformat, adding a dbConn.commit() after the executeBatch() on the
>> PreparedStatement. E.g:
>>
>>     upload.executeBatch();
>>     dbConn.commit();
>>
>> For the moment I've just created a dedicated PhoenixJdbcOutpuFormat where
>> I've added these 2 lines of code starting from the code of the
>> JDBCOutputformat (it couldn't be extended in this case because all fields
>> are private).
>>
>> What do you think about this? Should I open a ticket to add a connection
>> commit after executeBatch (in order to be compatible with Phoenix) or
>> something else (e.g. create a Phoenix connector that basically extend
>> JDBCOutputConnector and ovewrite 2 methods, changing also the visibility of
>> its fields to protected)?
>>
>> Best,
>> Flavio
>>
>>
>

Mime
View raw message