hawq-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From leskin-in <...@git.apache.org>
Subject [GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...
Date Fri, 31 Aug 2018 16:28:27 GMT
Github user leskin-in commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r214408051
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java
---
    @@ -0,0 +1,437 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.plugins.jdbc.writercallable.WriterCallable;
    +import org.apache.hawq.pxf.plugins.jdbc.writercallable.WriterCallableFactory;
    +
    +import java.util.List;
    +import java.util.LinkedList;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +
    +import java.io.IOException;
    +import java.text.ParseException;
    +import java.sql.Statement;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.SQLTimeoutException;
    +import java.sql.BatchUpdateException;
    +import java.sql.Connection;
    +import java.sql.DatabaseMetaData;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables accessor
    + *
    + * The SELECT queries are processed by {@link java.sql.Statement}
    + *
    + * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
    + * built-in JDBC batches of arbitrary size
    + */
    +public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     * Create query, open JDBC connection, execute query and store the result into resultSet
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if th SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the JDBC driver was not found
    +     */
    +    @Override
    +    public boolean openForRead() throws SQLException, SQLTimeoutException, ParseException,
ClassNotFoundException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            return true;
    +        }
    +
    +        Connection connection = super.getConnection();
    +
    +        queryRead = buildSelectQuery(connection.getMetaData());
    +        statementRead = connection.createStatement();
    +        resultSetRead = statementRead.executeQuery(queryRead);
    +
    +        return true;
    +    }
    +
    +    /**
    +     * readNextObject() implementation
    +     * Retreive the next tuple from resultSet and return it
    +     *
    +     * @throws SQLException if a problem in resultSet occurs
    +     */
    +    @Override
    +    public OneRow readNextObject() throws SQLException {
    +        if (resultSetRead.next()) {
    +            return new OneRow(resultSetRead);
    +        }
    +        return null;
    +    }
    +
    +    /**
    +     * closeForRead() implementation
    +     */
    +    @Override
    +    public void closeForRead() {
    +        JdbcPlugin.closeStatement(statementRead);
    +    }
    +
    +    /**
    +     * openForWrite() implementation
    +     * Create query template and open JDBC connection
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if the SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the JDBC driver was not found
    +     * @throws IllegalArgumentException if the provided or generated combination of user-defined
parameters cannot be processed
    +     */
    +    @Override
    +    public boolean openForWrite() throws SQLException, SQLTimeoutException, ParseException,
ClassNotFoundException, IllegalArgumentException {
    +        if (statementWrite != null && !statementWrite.isClosed()) {
    +            throw new SQLException("The connection to an external database is already
open.");
    +        }
    +
    +        Connection connection = super.getConnection();
    +
    +        queryWrite = buildInsertQuery();
    +        statementWrite = super.getPreparedStatement(connection, queryWrite);
    +
    +        // Process batchSize
    +        if ((batchSize != 0) && (!connection.getMetaData().supportsBatchUpdates()))
{
    +            LOG.warn(
    +                "The database '" +
    +                connection.getMetaData().getDatabaseProductName() +
    +                "' does not support batch updates. The current request will be handled
without batching"
    +            );
    +            batchSize = 0;
    +        }
    +
    +        // Process poolSize
    +        if (poolSize < 1) {
    +            poolSize = Runtime.getRuntime().availableProcessors();
    +            LOG.info(
    +                "The POOL_SIZE is set to the number of CPUs available (" + Integer.toString(poolSize)
+ ")"
    +            );
    +        }
    +        if (poolSize > 1) {
    +            executorServiceWrite = Executors.newFixedThreadPool(poolSize);
    +            poolTasks = new LinkedList<>();
    +        }
    +
    +        // Setup WriterCallableFactory
    +        writerCallableFactory = new WriterCallableFactory();
    +        writerCallableFactory.setPlugin(this);
    +        writerCallableFactory.setQuery(queryWrite);
    +        writerCallableFactory.setBatchSize(batchSize);
    +        if (poolSize == 1) {
    +            writerCallableFactory.setStatement(statementWrite);
    +        }
    +
    +        writerCallable = writerCallableFactory.get();
    +
    +        return true;
    +    }
    +
    +	/**
    +     * writeNextObject() implementation
    +     *
    +     * If batchSize is not 0 or 1, add a tuple to the batch of statementWrite
    +     * Otherwise, execute an INSERT query immediately
    +     *
    +     * In both cases, a {@link java.sql.PreparedStatement} is used
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws IOException if the data provided by {@link JdbcResolver} is corrupted
    +     * @throws ClassNotFoundException if pooling is used and the JDBC driver was not
found
    +     * @throws IllegalStateException if writerCallableFactory was not properly initialized
    +     * @throws Exception if it happens in writerCallable.call()
    +     */
    +    @Override
    +    public boolean writeNextObject(OneRow row) throws Exception {
    +        if (writerCallable == null) {
    +            throw new IllegalStateException("The JDBC connection was not properly initialized
(writerCallable is null)");
    +        }
    +
    +        SQLException rollbackException = null;
    +
    +        writerCallable.supply(row);
    +        if (writerCallable.isCallRequired()) {
    +            if (poolSize > 1) {
    +                // Pooling is used. Create new writerCallable
    +                poolTasks.add(executorServiceWrite.submit(writerCallable));
    +                writerCallable = writerCallableFactory.get();
    +            }
    +            else {
    +                // Pooling is not used
    +                try {
    +                    rollbackException = writerCallable.call();
    +                }
    +                catch (SQLException e) {
    +                    rollbackException = e;
    +                }
    +                catch (Exception e) {
    +                    // This is not expected
    +                    throw e;
    +                }
    +            }
    +        }
    +
    +        if (rollbackException != null) {
    +            if (LOG.isDebugEnabled()) {
    +                LOG.debug("Rollback is now required");
    +            }
    +            rollbackException = tryRollback(statementWrite.getConnection(), rollbackException);
    --- End diff --
    
    @denalex , you are right. 
    The only scenario when `tryRollback()` works properly is one PXF segment, one thread,
and an external database that supports transactions.
    There are more than one PXF segment available in all cases, thus there is no guarantee
that rollback will work properly, and therefore, rollback is of no use.


---

Mime
View raw message