hawq-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From denalex <...@git.apache.org>
Subject [GitHub] incubator-hawq pull request #1353: HAWQ-1605. Support INSERT in PXF JDBC plu...
Date Tue, 28 Aug 2018 21:40:06 GMT
Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1353#discussion_r213481546
  
    --- Diff: pxf/pxf-jdbc/src/main/java/org/apache/hawq/pxf/plugins/jdbc/JdbcAccessor.java
---
    @@ -0,0 +1,437 @@
    +package org.apache.hawq.pxf.plugins.jdbc;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.plugins.jdbc.writercallable.WriterCallable;
    +import org.apache.hawq.pxf.plugins.jdbc.writercallable.WriterCallableFactory;
    +
    +import java.util.List;
    +import java.util.LinkedList;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.Future;
    +
    +import java.io.IOException;
    +import java.text.ParseException;
    +import java.sql.Statement;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.SQLTimeoutException;
    +import java.sql.BatchUpdateException;
    +import java.sql.Connection;
    +import java.sql.DatabaseMetaData;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +/**
    + * JDBC tables accessor
    + *
    + * The SELECT queries are processed by {@link java.sql.Statement}
    + *
    + * The INSERT queries are processed by {@link java.sql.PreparedStatement} and
    + * built-in JDBC batches of arbitrary size
    + */
    +public class JdbcAccessor extends JdbcPlugin implements ReadAccessor, WriteAccessor {
    +    /**
    +     * Class constructor
    +     */
    +    public JdbcAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     * Create query, open JDBC connection, execute query and store the result into resultSet
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if th SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the JDBC driver was not found
    +     */
    +    @Override
    +    public boolean openForRead() throws SQLException, SQLTimeoutException, ParseException,
ClassNotFoundException {
    +        if (statementRead != null && !statementRead.isClosed()) {
    +            return true;
    +        }
    +
    +        Connection connection = super.getConnection();
    +
    +        queryRead = buildSelectQuery(connection.getMetaData());
    +        statementRead = connection.createStatement();
    +        resultSetRead = statementRead.executeQuery(queryRead);
    +
    +        return true;
    +    }
    +
    +    /**
    +     * readNextObject() implementation
    +     * Retreive the next tuple from resultSet and return it
    +     *
    +     * @throws SQLException if a problem in resultSet occurs
    +     */
    +    @Override
    +    public OneRow readNextObject() throws SQLException {
    +        if (resultSetRead.next()) {
    +            return new OneRow(resultSetRead);
    +        }
    +        return null;
    +    }
    +
    +    /**
    +     * closeForRead() implementation
    +     */
    +    @Override
    +    public void closeForRead() {
    +        JdbcPlugin.closeStatement(statementRead);
    +    }
    +
    +    /**
    +     * openForWrite() implementation
    +     * Create query template and open JDBC connection
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws SQLTimeoutException if a problem with the connection occurs
    +     * @throws ParseException if the SQL statement provided in PXF InputData is incorrect
    +     * @throws ClassNotFoundException if the JDBC driver was not found
    +     * @throws IllegalArgumentException if the provided or generated combination of user-defined
parameters cannot be processed
    +     */
    +    @Override
    +    public boolean openForWrite() throws SQLException, SQLTimeoutException, ParseException,
ClassNotFoundException, IllegalArgumentException {
    +        if (statementWrite != null && !statementWrite.isClosed()) {
    +            throw new SQLException("The connection to an external database is already
open.");
    +        }
    +
    +        Connection connection = super.getConnection();
    +
    +        queryWrite = buildInsertQuery();
    +        statementWrite = super.getPreparedStatement(connection, queryWrite);
    +
    +        // Process batchSize
    +        if ((batchSize != 0) && (!connection.getMetaData().supportsBatchUpdates()))
{
    +            LOG.warn(
    +                "The database '" +
    +                connection.getMetaData().getDatabaseProductName() +
    +                "' does not support batch updates. The current request will be handled
without batching"
    +            );
    +            batchSize = 0;
    +        }
    +
    +        // Process poolSize
    +        if (poolSize < 1) {
    +            poolSize = Runtime.getRuntime().availableProcessors();
    +            LOG.info(
    +                "The POOL_SIZE is set to the number of CPUs available (" + Integer.toString(poolSize)
+ ")"
    +            );
    +        }
    +        if (poolSize > 1) {
    +            executorServiceWrite = Executors.newFixedThreadPool(poolSize);
    +            poolTasks = new LinkedList<>();
    +        }
    +
    +        // Setup WriterCallableFactory
    +        writerCallableFactory = new WriterCallableFactory();
    +        writerCallableFactory.setPlugin(this);
    +        writerCallableFactory.setQuery(queryWrite);
    +        writerCallableFactory.setBatchSize(batchSize);
    +        if (poolSize == 1) {
    +            writerCallableFactory.setStatement(statementWrite);
    +        }
    +
    +        writerCallable = writerCallableFactory.get();
    +
    +        return true;
    +    }
    +
    +	/**
    +     * writeNextObject() implementation
    +     *
    +     * If batchSize is not 0 or 1, add a tuple to the batch of statementWrite
    +     * Otherwise, execute an INSERT query immediately
    +     *
    +     * In both cases, a {@link java.sql.PreparedStatement} is used
    +     *
    +     * @throws SQLException if a database access error occurs
    +     * @throws IOException if the data provided by {@link JdbcResolver} is corrupted
    +     * @throws ClassNotFoundException if pooling is used and the JDBC driver was not
found
    +     * @throws IllegalStateException if writerCallableFactory was not properly initialized
    +     * @throws Exception if it happens in writerCallable.call()
    +     */
    +    @Override
    +    public boolean writeNextObject(OneRow row) throws Exception {
    +        if (writerCallable == null) {
    +            throw new IllegalStateException("The JDBC connection was not properly initialized
(writerCallable is null)");
    +        }
    +
    +        SQLException rollbackException = null;
    +
    +        writerCallable.supply(row);
    +        if (writerCallable.isCallRequired()) {
    +            if (poolSize > 1) {
    +                // Pooling is used. Create new writerCallable
    +                poolTasks.add(executorServiceWrite.submit(writerCallable));
    +                writerCallable = writerCallableFactory.get();
    +            }
    +            else {
    +                // Pooling is not used
    +                try {
    +                    rollbackException = writerCallable.call();
    +                }
    +                catch (SQLException e) {
    +                    rollbackException = e;
    +                }
    +                catch (Exception e) {
    +                    // This is not expected
    +                    throw e;
    +                }
    +            }
    +        }
    +
    +        if (rollbackException != null) {
    +            if (LOG.isDebugEnabled()) {
    +                LOG.debug("Rollback is now required");
    +            }
    +            rollbackException = tryRollback(statementWrite.getConnection(), rollbackException);
    --- End diff --
    
    we cannot guarantee transactional semantics for this profile anyways, as a single INSERT
will be executed by multiple segments and multiple PXF threads, and there is no way for one
of the threads to know the status of the others. It was suggested before to remove rollback
logic altogether as it serves no practical purpose and just complicates the code and the processing
logic. @leskin-in @kapustor -- What do you think ? 


---

Mime
View raw message