phoenix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From JamesRTaylor <...@git.apache.org>
Subject [GitHub] phoenix pull request #269: PHOENIX-2460 Implement scrutiny command to valida...
Date Wed, 23 Aug 2017 00:23:49 GMT
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/269#discussion_r134624315
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
---
    @@ -0,0 +1,349 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.mapreduce.index;
    +
    +import java.io.IOException;
    +import java.sql.Connection;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Properties;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.Mapper;
    +import org.apache.phoenix.mapreduce.PhoenixJobCounters;
    +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
    +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
    +import org.apache.phoenix.mapreduce.util.ConnectionUtil;
    +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
    +import org.apache.phoenix.parse.HintNode.Hint;
    +import org.apache.phoenix.schema.PTable;
    +import org.apache.phoenix.util.ColumnInfo;
    +import org.apache.phoenix.util.PhoenixRuntime;
    +import org.apache.phoenix.util.QueryUtil;
    +import org.apache.phoenix.util.SchemaUtil;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Joiner;
    +
    +/**
    + * Mapper that reads from the data table and checks the rows against the index table
    + */
    +public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWritable,
Text, Text> {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(IndexScrutinyMapper.class);
    +    private Connection connection;
    +    private List<ColumnInfo> targetTblColumnMetadata;
    +    private long batchSize;
    +    // holds a batch of rows from the table the mapper is iterating over
    +    private List<List<Object>> currentBatchValues = new ArrayList<>();
    +    private String targetTableQuery;
    +    private int numTargetPkCols;
    +    private boolean outputInvalidRows;
    +    private OutputFormat outputFormat = OutputFormat.FILE;
    +    private String qSourceTable;
    +    private String qTargetTable;
    +    private long executeTimestamp;
    +    private int numSourcePkCols;
    +    private final PhoenixIndexDBWritable indxWritable = new PhoenixIndexDBWritable();
    +    private List<ColumnInfo> sourceTblColumnMetadata;
    +
    +    // used to write results to the output table
    +    private Connection outputConn;
    +    private PreparedStatement outputUpsertStmt;
    +    private long outputMaxRows;
    +
    +    @Override
    +    protected void setup(final Context context) throws IOException, InterruptedException
{
    +        super.setup(context);
    +        final Configuration configuration = context.getConfiguration();
    +        try {
    +            // get a connection with correct CURRENT_SCN (so incoming writes don't throw
off the
    +            // scrutiny)
    +            final Properties overrideProps = new Properties();
    +            String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
    +            overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
    +            connection = ConnectionUtil.getOutputConnection(configuration, overrideProps);
    +            connection.setAutoCommit(false);
    +            batchSize = PhoenixConfigurationUtil.getScrutinyBatchSize(configuration);
    +            outputInvalidRows =
    +                    PhoenixConfigurationUtil.getScrutinyOutputInvalidRows(configuration);
    +            outputFormat = PhoenixConfigurationUtil.getScrutinyOutputFormat(configuration);
    +            executeTimestamp = PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(configuration);
    +
    +            // get the index table and column names
    +            String qDataTable = PhoenixConfigurationUtil.getScrutinyDataTableName(configuration);
    +            final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
    +            final String qIndexTable =
    +                    PhoenixConfigurationUtil.getScrutinyIndexTableName(configuration);
    +            final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable);
    +
    +            // set the target table based on whether we're running the MR over the data
or index
    +            // table
    +            SourceTable sourceTable =
    +                    PhoenixConfigurationUtil.getScrutinySourceTable(configuration);
    +            SourceTargetColumnNames columnNames =
    +                    SourceTable.DATA_TABLE_SOURCE.equals(sourceTable)
    +                            ? new SourceTargetColumnNames.DataSourceColNames(pdataTable,
    +                                    pindexTable)
    +                            : new SourceTargetColumnNames.IndexSourceColNames(pdataTable,
    +                                    pindexTable);
    +            qSourceTable = columnNames.getQualifiedSourceTableName();
    +            qTargetTable = columnNames.getQualifiedTargetTableName();
    +            List<String> targetColNames = columnNames.getTargetColNames();
    +            List<String> sourceColNames = columnNames.getSourceColNames();
    +            List<String> targetPkColNames = columnNames.getTargetPkColNames();
    +            String targetPksCsv =
    +                    Joiner.on(",").join(SchemaUtil.getEscapedFullColumnNames(targetPkColNames));
    +            numSourcePkCols = columnNames.getSourcePkColNames().size();
    +            numTargetPkCols = targetPkColNames.size();
    +
    +            if (OutputFormat.TABLE.equals(outputFormat)) {
    +                outputConn = ConnectionUtil.getOutputConnection(configuration, new Properties());
    +                String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
    +                this.outputUpsertStmt = outputConn.prepareStatement(upsertQuery);
    +            }
    +            outputMaxRows = PhoenixConfigurationUtil.getScrutinyOutputMax(configuration);
    +
    +            // Create the query against the target table
    +            // Our query projection should be all the index column names (or their data
table
    +            // equivalent
    +            // name)
    +            targetTableQuery =
    +                    QueryUtil.constructSelectStatement(qTargetTable, columnNames.getCastedTargetColNames(),
targetPksCsv,
    +                        Hint.NO_INDEX, false) + " IN ";
    +            targetTblColumnMetadata =
    +                    PhoenixRuntime.generateColumnInfo(connection, qTargetTable, targetColNames);
    +            sourceTblColumnMetadata =
    +                    PhoenixRuntime.generateColumnInfo(connection, qSourceTable, sourceColNames);
    +            LOG.info("Base query against target table: " + targetTableQuery);
    +        } catch (SQLException e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
    +            throws IOException, InterruptedException {
    +        try {
    +            final List<Object> values = record.getValues();
    +
    +            context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
    +            currentBatchValues.add(values);
    +            if (context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize
!= 0) {
    +                // if we haven't hit the batch size, just report progress and move on
to next record
    +                context.progress();
    +                return;
    +            } else {
    +                // otherwise, process the batch
    +                processBatch(context);
    +            }
    +            context.progress(); // Make sure progress is reported to Application Master.
    +        } catch (SQLException | IllegalArgumentException e) {
    +            LOG.error(" Error while read/write of a record ", e);
    +            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
    +            throw new IOException(e);
    +        }
    +    }
    +
    +    @Override
    +    protected void cleanup(Context context) throws IOException, InterruptedException
{
    +        super.cleanup(context);
    +        if (connection != null) {
    +            try {
    +                processBatch(context);
    +                connection.close();
    +                if (outputConn != null) {
    +                    outputConn.close();
    +                }
    +            } catch (SQLException e) {
    +                LOG.error("Error while closing connection in the PhoenixIndexMapper class
", e);
    +                throw new IOException(e);
    +            }
    +        }
    +    }
    +
    +    private void processBatch(Context context)
    +            throws SQLException, IOException, InterruptedException {
    +        if (currentBatchValues.size() == 0) return;
    +        context.getCounter(PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT).increment(1);
    +        // our query selection filter should be the PK columns of the target table (index
or data
    +        // table)
    +        String inClause =
    +                QueryUtil.constructParameterizedInClause(numTargetPkCols,
    +                    currentBatchValues.size());
    +        String indexQuery = targetTableQuery + inClause;
    +        PreparedStatement targetStatement = connection.prepareStatement(indexQuery);
    +
    +        // while we build the PreparedStatement, we also maintain a hash of the target
table PKs,
    +        // which we use to join against the results of the query on the target table
    +        Map<Integer, List<Object>> targetPkToSourceValues = buildTargetStatement(targetStatement);
    +
    +        // fetch results from the target table and output invalid rows
    +        queryTargetTable(context, targetStatement, targetPkToSourceValues);
    +
    +        // any source values we have left over are invalid (e.g. data table rows without
    +        // corresponding index row)
    +        context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT)
    +                .increment(targetPkToSourceValues.size());
    +        if (outputInvalidRows) {
    +            for (List<Object> sourceRowWithoutTargetRow : targetPkToSourceValues.values())
{
    +                if (OutputFormat.FILE.equals(outputFormat)) {
    +                    context.write(new Text(Arrays.toString(sourceRowWithoutTargetRow.toArray())),
    +                        new Text("Target row not found"));
    +                } else if (OutputFormat.TABLE.equals(outputFormat)) {
    +                    writeToOutputTable(context, sourceRowWithoutTargetRow, null);
    +                }
    +            }
    +        }
    +        if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) {
    +            outputUpsertStmt.executeBatch(); // write out invalid rows to output table
    +            outputConn.commit();
    +        }
    +        currentBatchValues.clear();
    +    }
    +
    +    private Map<Integer, List<Object>> buildTargetStatement(PreparedStatement
targetStatement)
    +            throws SQLException {
    +        Map<Integer, List<Object>> targetPkToSourceValues =
    +                new HashMap<>(currentBatchValues.size());
    +        int rsIndex = 1;
    +        for (List<Object> batchRow : currentBatchValues) {
    +            // our original query against the source table (which provided the batchRow)
projected
    +            // with the data table PK cols first, so the first numTargetPkCols form the
PK
    +            int targetPkHash = getPkHash(batchRow.subList(0, numTargetPkCols));
    +            targetPkToSourceValues.put(targetPkHash, batchRow);
    +            for (int i = 0; i < numTargetPkCols; i++) {
    +                ColumnInfo targetPkInfo = targetTblColumnMetadata.get(i);
    +                Object value = batchRow.get(i);
    +                if (value == null) {
    +                    targetStatement.setNull(rsIndex++, targetPkInfo.getSqlType());
    +                } else {
    +                    targetStatement.setObject(rsIndex++, value, targetPkInfo.getSqlType());
    +                }
    +            }
    +        }
    +        return targetPkToSourceValues;
    +    }
    +
    +    private void queryTargetTable(Context context, PreparedStatement targetStatement,
    +            Map<Integer, List<Object>> targetPkToSourceValues)
    +            throws SQLException, IOException, InterruptedException {
    +        ResultSet targetResultSet = targetStatement.executeQuery();
    +        while (targetResultSet.next()) {
    +            indxWritable.readFields(targetResultSet);
    +            List<Object> targetValues = indxWritable.getValues();
    +            // first grab the PK and try to join against the source input
    +            // the query is such that first numTargetPkCols of the resultSet is the PK
    +            List<Object> pkObjects = new ArrayList<>(numTargetPkCols);
    +            for (int i = 0; i < numTargetPkCols; i++) {
    +                Object pkPart = targetResultSet.getObject(i + 1);
    --- End diff --
    
    If you want to get at the time stamp of the underlying cell, you can do something like
the following: 
    
       targetResultSet.unwrap(PhoenixResultSet.class).getCurrentRow().get(0).getTimestamp();
    
    This would give you back the time stamp of the first Cell from the scan that was executed
(we only return a single cell back with the selected expressions packed into the value). Not
sure if this is useful or not, but just wanted to pass it along.
    
        


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message