nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (NIFI-2156) Add ListDatabaseTables processor
Date Thu, 30 Jun 2016 22:57:10 GMT

    [ https://issues.apache.org/jira/browse/NIFI-2156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358001#comment-15358001
] 

ASF GitHub Bot commented on NIFI-2156:
--------------------------------------

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/599#discussion_r69224252
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
---
    @@ -0,0 +1,306 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.dbcp.DBCPService;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.io.IOException;
    +import java.sql.Connection;
    +import java.sql.DatabaseMetaData;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * A processor to retrieve a list of tables (and their metadata) from a database connection
    + */
    +@TriggerSerially
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"sql", "list", "jdbc", "table", "database"})
    +@CapabilityDescription("Generates a set of flow files, each containing attributes corresponding
to metadata about a table from a database connection.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "db.table.name", description = "Contains the name
of a database table from the connection"),
    +        @WritesAttribute(attribute = "db.table.catalog", description = "Contains the
name of the catalog to which the table belongs (may be null)"),
    +        @WritesAttribute(attribute = "db.table.schema", description = "Contains the name
of the schema to which the table belongs (may be null)"),
    +        @WritesAttribute(attribute = "db.table.fullname", description = "Contains the
fully-qualifed table name (possibly including catalog, schema, etc.)"),
    +        @WritesAttribute(attribute = "db.table.type",
    +                description = "Contains the type of the database table from the connection.
Typical types are \"TABLE\", \"VIEW\", \"SYSTEM TABLE\", "
    +                        + "\"GLOBAL TEMPORARY\", \"LOCAL TEMPORARY\", \"ALIAS\", \"SYNONYM\""),
    +        @WritesAttribute(attribute = "db.table.remarks", description = "Contains the
name of a database table from the connection"),
    +        @WritesAttribute(attribute = "db.table.count", description = "Contains the number
of rows in the table")
    +})
    +@Stateful(scopes = {Scope.LOCAL}, description = "After performing a listing of tables,
the timestamp of the query is stored. "
    +        + "This allows the Processor to not re-list tables the next time that the Processor
is run. Changing any of the processor properties will "
    +        + "indicate that the processor should reset state and thus re-list the tables
using the new configuration.")
    +public class ListDatabaseTables extends AbstractProcessor {
    +
    +    // Attribute names
    +    public static final String DB_TABLE_NAME = "db.table.name";
    +    public static final String DB_TABLE_CATALOG = "db.table.catalog";
    +    public static final String DB_TABLE_SCHEMA = "db.table.schema";
    +    public static final String DB_TABLE_FULLNAME = "db.table.name";
    +    public static final String DB_TABLE_TYPE = "db.table.type";
    +    public static final String DB_TABLE_REMARKS = "db.table.remarks";
    +    public static final String DB_TABLE_COUNT = "db.table.count";
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All FlowFiles that are received are routed to success")
    +            .build();
    +
    +    // Property descriptors
    +    public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
    +            .name("list-db-tables-db-connection")
    +            .displayName("Database Connection Pooling Service")
    +            .description("The Controller Service that is used to obtain connection to
database")
    +            .required(true)
    +            .identifiesControllerService(DBCPService.class)
    +            .build();
    +
    +    public static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
    +            .name("list-db-tables-catalog")
    +            .displayName("Catalog")
    +            .description("The name of a catalog from which to list database tables. The
name must match the catalog name as it is stored in the database. "
    +                    + "If the property is not set, the catalog name will not be used
to narrow the search for tables. If the property is set to an empty string, "
    +                    + "tables without a catalog will be listed.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final PropertyDescriptor SCHEMA_PATTERN = new PropertyDescriptor.Builder()
    +            .name("list-db-tables-schema-pattern")
    +            .displayName("Schema Pattern")
    +            .description("A pattern for matching schemas in the database. Within a pattern,
\"%\" means match any substring of 0 or more characters, "
    +                    + "and \"_\" means match any one character. The pattern must match
the schema name as it is stored in the database. "
    +                    + "If the property is not set, the schema name will not be used to
narrow the search for tables. If the property is set to an empty string, "
    +                    + "tables without a schema will be listed.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME_PATTERN = new PropertyDescriptor.Builder()
    +            .name("list-db-tables-name-pattern")
    +            .displayName("Table Name Pattern")
    +            .description("A pattern for matching tables in the database. Within a pattern,
\"%\" means match any substring of 0 or more characters, "
    +                    + "and \"_\" means match any one character. The pattern must match
the table name as it is stored in the database. "
    +                    + "If the property is not set, all tables will be retrieved.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_TYPES = new PropertyDescriptor.Builder()
    +            .name("list-db-tables-types")
    +            .displayName("Table Types")
    +            .description("A comma-separated list of table types to include. For example,
some databases support TABLE and VIEW types. If the property is not set, "
    +                    + "tables of all types will be returned.")
    +            .required(false)
    +            .defaultValue("TABLE")
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final PropertyDescriptor INCLUDE_COUNT = new PropertyDescriptor.Builder()
    +            .name("list-db-include-count")
    +            .displayName("Include Count")
    +            .description("Whether to include the table's row count as a flow file attribute.
This affects performance as a database query will be generated "
    +                    + "for each table in the retrieved list.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    private static final Set<Relationship> relationships;
    +
    +    private boolean resetState = false;
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(DBCP_SERVICE);
    +        _propertyDescriptors.add(CATALOG);
    +        _propertyDescriptors.add(SCHEMA_PATTERN);
    +        _propertyDescriptors.add(TABLE_NAME_PATTERN);
    +        _propertyDescriptors.add(TABLE_TYPES);
    +        _propertyDescriptors.add(INCLUDE_COUNT);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        try {
    +            if (resetState) {
    +                context.getStateManager().clear(getScope());
    +                resetState = false;
    +            }
    +        } catch (IOException ioe) {
    +            throw new ProcessException(ioe);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException
{
    +        final ComponentLog logger = getLogger();
    +        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
    +        final String catalog = context.getProperty(CATALOG).getValue();
    +        final String schemaPattern = context.getProperty(SCHEMA_PATTERN).getValue();
    +        final String tableNamePattern = context.getProperty(TABLE_NAME_PATTERN).getValue();
    +        final String[] tableTypes = context.getProperty(TABLE_TYPES).isSet()
    +                ? context.getProperty(TABLE_TYPES).getValue().split("\\s*,\\s*")
    +                : null;
    +        final boolean includeCount = context.getProperty(INCLUDE_COUNT).asBoolean();
    +
    +        final StateManager stateManager = context.getStateManager();
    +        final StateMap stateMap;
    +        final Map<String, String> stateMapProperties;
    +        try {
    +            stateMap = stateManager.getState(getScope());
    +            stateMapProperties = new HashMap<>(stateMap.toMap());
    +        } catch (IOException ioe) {
    +            throw new ProcessException(ioe);
    +        }
    +
    +        try (final Connection con = dbcpService.getConnection()) {
    +
    +            DatabaseMetaData dbMetaData = con.getMetaData();
    +            ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, tableNamePattern,
tableTypes);
    +            while (rs.next()) {
    +                final String tableCatalog = rs.getString(1);
    +                final String tableSchema = rs.getString(2);
    +                final String tableName = rs.getString(3);
    +                final String tableType = rs.getString(4);
    +                final String tableRemarks = rs.getString(5);
    +
    +                // Build fully-qualified name
    +                String fqn = Stream.of(tableCatalog, tableSchema, tableName)
    +                        .filter(segment -> !StringUtils.isEmpty(segment))
    +                        .collect(Collectors.joining("."));
    +
    +                String fqTableName = stateMap.get(fqn);
    +                if (fqTableName == null) {
    +                    FlowFile flowFile = session.create();
    +                    logger.info("Found {}: {}", new Object[]{tableType, fqn});
    +                    if (includeCount) {
    +                        try (Statement st = con.createStatement()) {
    +                            final String countQuery = "SELECT COUNT(1) FROM " + fqn;
    +
    +                            logger.debug("Executing query: {}", new Object[]{countQuery});
    +                            ResultSet countResult = st.executeQuery(countQuery);
    +                            if (countResult.next()) {
    +                                flowFile = session.putAttribute(flowFile, DB_TABLE_COUNT,
Long.toString(countResult.getLong(1)));
    +                            }
    +                        } catch (SQLException se) {
    +                            logger.error("Couldn't get row count for {}", new Object[]{fqn});
    +                            session.remove(flowFile);
    +                            continue;
    +                        }
    +                    }
    +                    if (tableCatalog != null) {
    +                        flowFile = session.putAttribute(flowFile, DB_TABLE_CATALOG, tableCatalog);
    +                    }
    +                    if (tableSchema != null) {
    +                        flowFile = session.putAttribute(flowFile, DB_TABLE_SCHEMA, tableSchema);
    +                    }
    +                    flowFile = session.putAttribute(flowFile, DB_TABLE_NAME, tableName);
    +                    flowFile = session.putAttribute(flowFile, DB_TABLE_FULLNAME, fqn);
    +                    flowFile = session.putAttribute(flowFile, DB_TABLE_TYPE, tableType);
    +                    if (tableRemarks != null) {
    +                        flowFile = session.putAttribute(flowFile, DB_TABLE_REMARKS, tableRemarks);
    +                    }
    +
    +                    String transitUri;
    +                    try {
    +                        transitUri = dbMetaData.getURL();
    +                    } catch (SQLException sqle) {
    +                        transitUri = "<unknown>";
    +                    }
    +                    session.getProvenanceReporter().receive(flowFile, transitUri);
    +                    session.transfer(flowFile, REL_SUCCESS);
    +                    stateMapProperties.put(fqn, Long.toString(System.currentTimeMillis()));
    +                }
    +            }
    +            stateManager.replace(stateMap, stateMapProperties, getScope());
    +
    +        } catch (final SQLException | IOException e) {
    +            throw new ProcessException(e);
    +        }
    +    }
    +
    +    @Override
    +    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String
newValue) {
    +        super.onPropertyModified(descriptor, oldValue, newValue);
    +        // If any of the properties that define the retrieved list have changed, then
reset the state
    +        if (DBCP_SERVICE.equals(descriptor)
    +                || CATALOG.equals(descriptor)
    +                || SCHEMA_PATTERN.equals(descriptor)
    +                || TABLE_NAME_PATTERN.equals(descriptor)
    +                || TABLE_TYPES.equals(descriptor)) {
    +            resetState = true;
    +        }
    +    }
    +
    +    private Scope getScope() {
    +        return Scope.LOCAL;
    --- End diff --
    
    Yes I put it as a method early in development in case I needed logic or dependency injection.
Can change that at this point :)


> Add ListDatabaseTables processor
> --------------------------------
>
>                 Key: NIFI-2156
>                 URL: https://issues.apache.org/jira/browse/NIFI-2156
>             Project: Apache NiFi
>          Issue Type: Sub-task
>          Components: Extensions
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>             Fix For: 1.0.0
>
>
> This processor would use a DatabaseConnectionPool controller service, call getTables(),
and if the (optional, defaulting-to-false) property "Include Row Count" is set, then a "SELECT
COUNT(1) from table" would be issued to the database. The table catalog, schema, name, type,
remarks (and its count if specified) will be included as attributes in a zero-content flow
file.
> It will also use State Management to only list tables once. If new tables are added (and
the processor is running), then the new tables' flow files will be generated. Changing any
property that could affect the list of returned tables (such as the DB Connection, catalog,
schema pattern, table name pattern, or table types) will reset the state and all tables will
be fetched using the new criteria. The state can also be manually cleared using the standard
Clear State link on the View State dialog (available on the processor's context menu)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message