nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NIFI-3432) ExecuteSQL Should Support Multiple ResultSets
Date Tue, 14 Feb 2017 18:52:41 GMT

    [ https://issues.apache.org/jira/browse/NIFI-3432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15866388#comment-15866388
] 

ASF GitHub Bot commented on NIFI-3432:
--------------------------------------

Github user patricker commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1471#discussion_r101112040
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
---
    @@ -192,31 +192,57 @@ public void process(InputStream in) throws IOException {
             try (final Connection con = dbcpService.getConnection();
                 final Statement st = con.createStatement()) {
                 st.setQueryTimeout(queryTimeout); // timeout in seconds
    -            final AtomicLong nrOfRows = new AtomicLong(0L);
    -            if (fileToProcess == null) {
    -                fileToProcess = session.create();
    -            }
    -            fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
    -                @Override
    -                public void process(final OutputStream out) throws IOException {
    -                    try {
    -                        logger.debug("Executing query {}", new Object[]{selectQuery});
    -                        final ResultSet resultSet = st.executeQuery(selectQuery);
    -                        nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, convertNamesForAvro));
    -                    } catch (final SQLException e) {
    -                        throw new ProcessException(e);
    -                    }
    +
    +
    +            logger.debug("Executing query {}", new Object[]{selectQuery});
    +            boolean results = st.execute(selectQuery);
    +            int resultCount = 0;
    +            while(results){
    +                FlowFile resultSetFF;
    +                if(fileToProcess==null)
    +                    resultSetFF = session.create();
    +                else {
    +                    resultSetFF = session.create(fileToProcess);
    +                    resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
                     }
    -            });
     
    -            // set attribute how many rows were selected
    -            fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
    +                final AtomicLong nrOfRows = new AtomicLong(0L);
    +
    +                resultSetFF = session.write(resultSetFF, new OutputStreamCallback() {
    +                    @Override
    +                    public void process(final OutputStream out) throws IOException {
    +                        try {
    +                            ResultSet resultSet = st.getResultSet();
    +                            nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out,
convertNamesForAvro));
    +                        } catch (final SQLException e) {
    +                            throw new ProcessException(e);
    +                        }
    +                    }
    +                });
    +
    +                // set attribute how many rows were selected
    +                resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
    +
    +                logger.info("{} contains {} Avro records; transferring to 'success'",
    +                        new Object[]{resultSetFF, nrOfRows.get()});
    +                session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved
" + nrOfRows.get() + " rows",
    +                        stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    +                session.transfer(resultSetFF, REL_SUCCESS);
    +                resultCount++;
    +
    +                // are there anymore result sets?
    +                results = st.getMoreResults();
    +            }
     
    -            logger.info("{} contains {} Avro records; transferring to 'success'",
    -                    new Object[]{fileToProcess, nrOfRows.get()});
    -            session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved "
+ nrOfRows.get() + " rows",
    -                    stopWatch.getElapsed(TimeUnit.MILLISECONDS));
    -            session.transfer(fileToProcess, REL_SUCCESS);
    +            //If we had at least one result then it's OK to drop the original file, but
if we had no results then
    +            //  pass the original flow file down the line to trigger downstream processors
    +            if(fileToProcess != null) {
    +                if (resultCount > 0) {
    +                    session.remove(fileToProcess);
    +                } else {
    +                    session.transfer(fileToProcess, REL_SUCCESS);
    --- End diff --
    
    I went out and ran the original code base. A query that returns no results causes an empty
Avro schema.  I replicated this in my code, and built a test case.


> ExecuteSQL Should Support Multiple ResultSets
> ---------------------------------------------
>
>                 Key: NIFI-3432
>                 URL: https://issues.apache.org/jira/browse/NIFI-3432
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>    Affects Versions: 1.2.0
>            Reporter: Peter Wicks
>            Assignee: Peter Wicks
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> ExecuteSQL processor only supports processing a single resultset. If a query/stored procedure
call returns multiple resultsets then only one is kept.
> ExecuteSQL should be updated to support handling multiple resultsets. When multiple resultsets
exist a flow file should be created for each resultset.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message