phoenix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (PHOENIX-2460) Implement scrutiny command to validate whether or not an index is in sync with the data table
Date Wed, 23 Aug 2017 00:24:01 GMT

    [ https://issues.apache.org/jira/browse/PHOENIX-2460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137666#comment-16137666
] 

ASF GitHub Bot commented on PHOENIX-2460:
-----------------------------------------

Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/269#discussion_r134624315
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
---
    @@ -0,0 +1,349 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.mapreduce.index;
    +
    +import java.io.IOException;
    +import java.sql.Connection;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Properties;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.Mapper;
    +import org.apache.phoenix.mapreduce.PhoenixJobCounters;
    +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
    +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
    +import org.apache.phoenix.mapreduce.util.ConnectionUtil;
    +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
    +import org.apache.phoenix.parse.HintNode.Hint;
    +import org.apache.phoenix.schema.PTable;
    +import org.apache.phoenix.util.ColumnInfo;
    +import org.apache.phoenix.util.PhoenixRuntime;
    +import org.apache.phoenix.util.QueryUtil;
    +import org.apache.phoenix.util.SchemaUtil;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Joiner;
    +
    +/**
    + * Mapper that reads from the data table and checks the rows against the index table
    + */
    +public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWritable,
Text, Text> {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(IndexScrutinyMapper.class);
    +    private Connection connection;
    +    private List<ColumnInfo> targetTblColumnMetadata;
    +    private long batchSize;
    +    // holds a batch of rows from the table the mapper is iterating over
    +    private List<List<Object>> currentBatchValues = new ArrayList<>();
    +    private String targetTableQuery;
    +    private int numTargetPkCols;
    +    private boolean outputInvalidRows;
    +    private OutputFormat outputFormat = OutputFormat.FILE;
    +    private String qSourceTable;
    +    private String qTargetTable;
    +    private long executeTimestamp;
    +    private int numSourcePkCols;
    +    private final PhoenixIndexDBWritable indxWritable = new PhoenixIndexDBWritable();
    +    private List<ColumnInfo> sourceTblColumnMetadata;
    +
    +    // used to write results to the output table
    +    private Connection outputConn;
    +    private PreparedStatement outputUpsertStmt;
    +    private long outputMaxRows;
    +
    +    @Override
    +    protected void setup(final Context context) throws IOException, InterruptedException
{
    +        super.setup(context);
    +        final Configuration configuration = context.getConfiguration();
    +        try {
    +            // get a connection with correct CURRENT_SCN (so incoming writes don't throw
off the
    +            // scrutiny)
    +            final Properties overrideProps = new Properties();
    +            String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
    +            overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
    +            connection = ConnectionUtil.getOutputConnection(configuration, overrideProps);
    +            connection.setAutoCommit(false);
    +            batchSize = PhoenixConfigurationUtil.getScrutinyBatchSize(configuration);
    +            outputInvalidRows =
    +                    PhoenixConfigurationUtil.getScrutinyOutputInvalidRows(configuration);
    +            outputFormat = PhoenixConfigurationUtil.getScrutinyOutputFormat(configuration);
    +            executeTimestamp = PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(configuration);
    +
    +            // get the index table and column names
    +            String qDataTable = PhoenixConfigurationUtil.getScrutinyDataTableName(configuration);
    +            final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
    +            final String qIndexTable =
    +                    PhoenixConfigurationUtil.getScrutinyIndexTableName(configuration);
    +            final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable);
    +
    +            // set the target table based on whether we're running the MR over the data
or index
    +            // table
    +            SourceTable sourceTable =
    +                    PhoenixConfigurationUtil.getScrutinySourceTable(configuration);
    +            SourceTargetColumnNames columnNames =
    +                    SourceTable.DATA_TABLE_SOURCE.equals(sourceTable)
    +                            ? new SourceTargetColumnNames.DataSourceColNames(pdataTable,
    +                                    pindexTable)
    +                            : new SourceTargetColumnNames.IndexSourceColNames(pdataTable,
    +                                    pindexTable);
    +            qSourceTable = columnNames.getQualifiedSourceTableName();
    +            qTargetTable = columnNames.getQualifiedTargetTableName();
    +            List<String> targetColNames = columnNames.getTargetColNames();
    +            List<String> sourceColNames = columnNames.getSourceColNames();
    +            List<String> targetPkColNames = columnNames.getTargetPkColNames();
    +            String targetPksCsv =
    +                    Joiner.on(",").join(SchemaUtil.getEscapedFullColumnNames(targetPkColNames));
    +            numSourcePkCols = columnNames.getSourcePkColNames().size();
    +            numTargetPkCols = targetPkColNames.size();
    +
    +            if (OutputFormat.TABLE.equals(outputFormat)) {
    +                outputConn = ConnectionUtil.getOutputConnection(configuration, new Properties());
    +                String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
    +                this.outputUpsertStmt = outputConn.prepareStatement(upsertQuery);
    +            }
    +            outputMaxRows = PhoenixConfigurationUtil.getScrutinyOutputMax(configuration);
    +
    +            // Create the query against the target table
    +            // Our query projection should be all the index column names (or their data
table
    +            // equivalent
    +            // name)
    +            targetTableQuery =
    +                    QueryUtil.constructSelectStatement(qTargetTable, columnNames.getCastedTargetColNames(),
targetPksCsv,
    +                        Hint.NO_INDEX, false) + " IN ";
    +            targetTblColumnMetadata =
    +                    PhoenixRuntime.generateColumnInfo(connection, qTargetTable, targetColNames);
    +            sourceTblColumnMetadata =
    +                    PhoenixRuntime.generateColumnInfo(connection, qSourceTable, sourceColNames);
    +            LOG.info("Base query against target table: " + targetTableQuery);
    +        } catch (SQLException e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    @Override
    +    protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
    +            throws IOException, InterruptedException {
    +        try {
    +            final List<Object> values = record.getValues();
    +
    +            context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
    +            currentBatchValues.add(values);
    +            if (context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize
!= 0) {
    +                // if we haven't hit the batch size, just report progress and move on
to next record
    +                context.progress();
    +                return;
    +            } else {
    +                // otherwise, process the batch
    +                processBatch(context);
    +            }
    +            context.progress(); // Make sure progress is reported to Application Master.
    +        } catch (SQLException | IllegalArgumentException e) {
    +            LOG.error(" Error while read/write of a record ", e);
    +            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
    +            throw new IOException(e);
    +        }
    +    }
    +
    +    @Override
    +    protected void cleanup(Context context) throws IOException, InterruptedException
{
    +        super.cleanup(context);
    +        if (connection != null) {
    +            try {
    +                processBatch(context);
    +                connection.close();
    +                if (outputConn != null) {
    +                    outputConn.close();
    +                }
    +            } catch (SQLException e) {
    +                LOG.error("Error while closing connection in the PhoenixIndexMapper class
", e);
    +                throw new IOException(e);
    +            }
    +        }
    +    }
    +
    +    private void processBatch(Context context)
    +            throws SQLException, IOException, InterruptedException {
    +        if (currentBatchValues.size() == 0) return;
    +        context.getCounter(PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT).increment(1);
    +        // our query selection filter should be the PK columns of the target table (index
or data
    +        // table)
    +        String inClause =
    +                QueryUtil.constructParameterizedInClause(numTargetPkCols,
    +                    currentBatchValues.size());
    +        String indexQuery = targetTableQuery + inClause;
    +        PreparedStatement targetStatement = connection.prepareStatement(indexQuery);
    +
    +        // while we build the PreparedStatement, we also maintain a hash of the target
table PKs,
    +        // which we use to join against the results of the query on the target table
    +        Map<Integer, List<Object>> targetPkToSourceValues = buildTargetStatement(targetStatement);
    +
    +        // fetch results from the target table and output invalid rows
    +        queryTargetTable(context, targetStatement, targetPkToSourceValues);
    +
    +        // any source values we have left over are invalid (e.g. data table rows without
    +        // corresponding index row)
    +        context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT)
    +                .increment(targetPkToSourceValues.size());
    +        if (outputInvalidRows) {
    +            for (List<Object> sourceRowWithoutTargetRow : targetPkToSourceValues.values())
{
    +                if (OutputFormat.FILE.equals(outputFormat)) {
    +                    context.write(new Text(Arrays.toString(sourceRowWithoutTargetRow.toArray())),
    +                        new Text("Target row not found"));
    +                } else if (OutputFormat.TABLE.equals(outputFormat)) {
    +                    writeToOutputTable(context, sourceRowWithoutTargetRow, null);
    +                }
    +            }
    +        }
    +        if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) {
    +            outputUpsertStmt.executeBatch(); // write out invalid rows to output table
    +            outputConn.commit();
    +        }
    +        currentBatchValues.clear();
    +    }
    +
    +    private Map<Integer, List<Object>> buildTargetStatement(PreparedStatement
targetStatement)
    +            throws SQLException {
    +        Map<Integer, List<Object>> targetPkToSourceValues =
    +                new HashMap<>(currentBatchValues.size());
    +        int rsIndex = 1;
    +        for (List<Object> batchRow : currentBatchValues) {
    +            // our original query against the source table (which provided the batchRow)
projected
    +            // with the data table PK cols first, so the first numTargetPkCols form the
PK
    +            int targetPkHash = getPkHash(batchRow.subList(0, numTargetPkCols));
    +            targetPkToSourceValues.put(targetPkHash, batchRow);
    +            for (int i = 0; i < numTargetPkCols; i++) {
    +                ColumnInfo targetPkInfo = targetTblColumnMetadata.get(i);
    +                Object value = batchRow.get(i);
    +                if (value == null) {
    +                    targetStatement.setNull(rsIndex++, targetPkInfo.getSqlType());
    +                } else {
    +                    targetStatement.setObject(rsIndex++, value, targetPkInfo.getSqlType());
    +                }
    +            }
    +        }
    +        return targetPkToSourceValues;
    +    }
    +
    +    private void queryTargetTable(Context context, PreparedStatement targetStatement,
    +            Map<Integer, List<Object>> targetPkToSourceValues)
    +            throws SQLException, IOException, InterruptedException {
    +        ResultSet targetResultSet = targetStatement.executeQuery();
    +        while (targetResultSet.next()) {
    +            indxWritable.readFields(targetResultSet);
    +            List<Object> targetValues = indxWritable.getValues();
    +            // first grab the PK and try to join against the source input
    +            // the query is such that first numTargetPkCols of the resultSet is the PK
    +            List<Object> pkObjects = new ArrayList<>(numTargetPkCols);
    +            for (int i = 0; i < numTargetPkCols; i++) {
    +                Object pkPart = targetResultSet.getObject(i + 1);
    --- End diff --
    
    If you want to get at the time stamp of the underlying cell, you can do something like
the following: 
    
       targetResultSet.unwrap(PhoenixResultSet.class).getCurrentRow().get(0).getTimestamp();
    
    This would give you back the time stamp of the first Cell from the scan that was executed
(we only return a single cell back with the selected expressions packed into the value). Not
sure if this is useful or not, but just wanted to pass it along.
    
        


> Implement scrutiny command to validate whether or not an index is in sync with the data
table
> ---------------------------------------------------------------------------------------------
>
>                 Key: PHOENIX-2460
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-2460
>             Project: Phoenix
>          Issue Type: Bug
>            Reporter: James Taylor
>            Assignee: Vincent Poon
>         Attachments: PHOENIX-2460.patch
>
>
> We should have a process that runs to verify that an index is valid against a data table
and potentially fixes it if discrepancies are found. This could either be a MR job or a low
priority background task.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message