Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 54A77200D08 for ; Wed, 23 Aug 2017 02:24:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5325A167C53; Wed, 23 Aug 2017 00:24:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 489ED167BEE for ; Wed, 23 Aug 2017 02:24:10 +0200 (CEST) Received: (qmail 69343 invoked by uid 500); 23 Aug 2017 00:24:07 -0000 Mailing-List: contact dev-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list dev@phoenix.apache.org Received: (qmail 69332 invoked by uid 99); 23 Aug 2017 00:24:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Aug 2017 00:24:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 15F991A1674 for ; Wed, 23 Aug 2017 00:24:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id hBWLN1_BGpEX for ; Wed, 23 Aug 2017 00:24:04 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 4F58461186 for ; Wed, 23 Aug 2017 00:24:03 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 459F3E0EC6 for ; Wed, 23 Aug 2017 00:24:02 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 40AFF25399 for ; Wed, 23 Aug 2017 00:24:01 +0000 (UTC) Date: Wed, 23 Aug 2017 00:24:01 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: dev@phoenix.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (PHOENIX-2460) Implement scrutiny command to validate whether or not an index is in sync with the data table MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 23 Aug 2017 00:24:11 -0000 [ https://issues.apache.org/jira/browse/PHOENIX-2460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16137666#comment-16137666 ] ASF GitHub Bot commented on PHOENIX-2460: ----------------------------------------- Github user JamesRTaylor commented on a diff in the pull request: https://github.com/apache/phoenix/pull/269#discussion_r134624315 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java --- @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.phoenix.mapreduce.PhoenixJobCounters; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.parse.HintNode.Hint; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; + +/** + * Mapper that reads from the data table and checks the rows against the index table + */ +public class IndexScrutinyMapper extends Mapper { + + private static final Logger LOG = LoggerFactory.getLogger(IndexScrutinyMapper.class); + private Connection connection; + private List targetTblColumnMetadata; + private long batchSize; + // holds a batch of rows from the table the mapper is iterating over + private List> currentBatchValues = new ArrayList<>(); + private String targetTableQuery; + private int numTargetPkCols; + private boolean outputInvalidRows; + private OutputFormat outputFormat = OutputFormat.FILE; + private String qSourceTable; + private String qTargetTable; + private long executeTimestamp; + private int numSourcePkCols; + private final PhoenixIndexDBWritable indxWritable = new PhoenixIndexDBWritable(); + private List sourceTblColumnMetadata; + + // used to write results to the output table + private Connection outputConn; + private PreparedStatement outputUpsertStmt; + private long outputMaxRows; + + @Override + protected void setup(final Context context) throws IOException, InterruptedException { + super.setup(context); + final Configuration configuration = context.getConfiguration(); + try { + // get a connection with correct CURRENT_SCN (so incoming writes don't throw off the + // scrutiny) + final Properties overrideProps = new Properties(); + String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); + overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn); + connection = ConnectionUtil.getOutputConnection(configuration, overrideProps); + connection.setAutoCommit(false); + batchSize = PhoenixConfigurationUtil.getScrutinyBatchSize(configuration); + outputInvalidRows = + PhoenixConfigurationUtil.getScrutinyOutputInvalidRows(configuration); + outputFormat = PhoenixConfigurationUtil.getScrutinyOutputFormat(configuration); + executeTimestamp = PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(configuration); + + // get the index table and column names + String qDataTable = PhoenixConfigurationUtil.getScrutinyDataTableName(configuration); + final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable); + final String qIndexTable = + PhoenixConfigurationUtil.getScrutinyIndexTableName(configuration); + final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable); + + // set the target table based on whether we're running the MR over the data or index + // table + SourceTable sourceTable = + PhoenixConfigurationUtil.getScrutinySourceTable(configuration); + SourceTargetColumnNames columnNames = + SourceTable.DATA_TABLE_SOURCE.equals(sourceTable) + ? new SourceTargetColumnNames.DataSourceColNames(pdataTable, + pindexTable) + : new SourceTargetColumnNames.IndexSourceColNames(pdataTable, + pindexTable); + qSourceTable = columnNames.getQualifiedSourceTableName(); + qTargetTable = columnNames.getQualifiedTargetTableName(); + List targetColNames = columnNames.getTargetColNames(); + List sourceColNames = columnNames.getSourceColNames(); + List targetPkColNames = columnNames.getTargetPkColNames(); + String targetPksCsv = + Joiner.on(",").join(SchemaUtil.getEscapedFullColumnNames(targetPkColNames)); + numSourcePkCols = columnNames.getSourcePkColNames().size(); + numTargetPkCols = targetPkColNames.size(); + + if (OutputFormat.TABLE.equals(outputFormat)) { + outputConn = ConnectionUtil.getOutputConnection(configuration, new Properties()); + String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); + this.outputUpsertStmt = outputConn.prepareStatement(upsertQuery); + } + outputMaxRows = PhoenixConfigurationUtil.getScrutinyOutputMax(configuration); + + // Create the query against the target table + // Our query projection should be all the index column names (or their data table + // equivalent + // name) + targetTableQuery = + QueryUtil.constructSelectStatement(qTargetTable, columnNames.getCastedTargetColNames(), targetPksCsv, + Hint.NO_INDEX, false) + " IN "; + targetTblColumnMetadata = + PhoenixRuntime.generateColumnInfo(connection, qTargetTable, targetColNames); + sourceTblColumnMetadata = + PhoenixRuntime.generateColumnInfo(connection, qSourceTable, sourceColNames); + LOG.info("Base query against target table: " + targetTableQuery); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context) + throws IOException, InterruptedException { + try { + final List values = record.getValues(); + + context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); + currentBatchValues.add(values); + if (context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize != 0) { + // if we haven't hit the batch size, just report progress and move on to next record + context.progress(); + return; + } else { + // otherwise, process the batch + processBatch(context); + } + context.progress(); // Make sure progress is reported to Application Master. + } catch (SQLException | IllegalArgumentException e) { + LOG.error(" Error while read/write of a record ", e); + context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); + throw new IOException(e); + } + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + super.cleanup(context); + if (connection != null) { + try { + processBatch(context); + connection.close(); + if (outputConn != null) { + outputConn.close(); + } + } catch (SQLException e) { + LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e); + throw new IOException(e); + } + } + } + + private void processBatch(Context context) + throws SQLException, IOException, InterruptedException { + if (currentBatchValues.size() == 0) return; + context.getCounter(PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT).increment(1); + // our query selection filter should be the PK columns of the target table (index or data + // table) + String inClause = + QueryUtil.constructParameterizedInClause(numTargetPkCols, + currentBatchValues.size()); + String indexQuery = targetTableQuery + inClause; + PreparedStatement targetStatement = connection.prepareStatement(indexQuery); + + // while we build the PreparedStatement, we also maintain a hash of the target table PKs, + // which we use to join against the results of the query on the target table + Map> targetPkToSourceValues = buildTargetStatement(targetStatement); + + // fetch results from the target table and output invalid rows + queryTargetTable(context, targetStatement, targetPkToSourceValues); + + // any source values we have left over are invalid (e.g. data table rows without + // corresponding index row) + context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT) + .increment(targetPkToSourceValues.size()); + if (outputInvalidRows) { + for (List sourceRowWithoutTargetRow : targetPkToSourceValues.values()) { + if (OutputFormat.FILE.equals(outputFormat)) { + context.write(new Text(Arrays.toString(sourceRowWithoutTargetRow.toArray())), + new Text("Target row not found")); + } else if (OutputFormat.TABLE.equals(outputFormat)) { + writeToOutputTable(context, sourceRowWithoutTargetRow, null); + } + } + } + if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) { + outputUpsertStmt.executeBatch(); // write out invalid rows to output table + outputConn.commit(); + } + currentBatchValues.clear(); + } + + private Map> buildTargetStatement(PreparedStatement targetStatement) + throws SQLException { + Map> targetPkToSourceValues = + new HashMap<>(currentBatchValues.size()); + int rsIndex = 1; + for (List batchRow : currentBatchValues) { + // our original query against the source table (which provided the batchRow) projected + // with the data table PK cols first, so the first numTargetPkCols form the PK + int targetPkHash = getPkHash(batchRow.subList(0, numTargetPkCols)); + targetPkToSourceValues.put(targetPkHash, batchRow); + for (int i = 0; i < numTargetPkCols; i++) { + ColumnInfo targetPkInfo = targetTblColumnMetadata.get(i); + Object value = batchRow.get(i); + if (value == null) { + targetStatement.setNull(rsIndex++, targetPkInfo.getSqlType()); + } else { + targetStatement.setObject(rsIndex++, value, targetPkInfo.getSqlType()); + } + } + } + return targetPkToSourceValues; + } + + private void queryTargetTable(Context context, PreparedStatement targetStatement, + Map> targetPkToSourceValues) + throws SQLException, IOException, InterruptedException { + ResultSet targetResultSet = targetStatement.executeQuery(); + while (targetResultSet.next()) { + indxWritable.readFields(targetResultSet); + List targetValues = indxWritable.getValues(); + // first grab the PK and try to join against the source input + // the query is such that first numTargetPkCols of the resultSet is the PK + List pkObjects = new ArrayList<>(numTargetPkCols); + for (int i = 0; i < numTargetPkCols; i++) { + Object pkPart = targetResultSet.getObject(i + 1); --- End diff -- If you want to get at the time stamp of the underlying cell, you can do something like the following: targetResultSet.unwrap(PhoenixResultSet.class).getCurrentRow().get(0).getTimestamp(); This would give you back the time stamp of the first Cell from the scan that was executed (we only return a single cell back with the selected expressions packed into the value). Not sure if this is useful or not, but just wanted to pass it along. > Implement scrutiny command to validate whether or not an index is in sync with the data table > --------------------------------------------------------------------------------------------- > > Key: PHOENIX-2460 > URL: https://issues.apache.org/jira/browse/PHOENIX-2460 > Project: Phoenix > Issue Type: Bug > Reporter: James Taylor > Assignee: Vincent Poon > Attachments: PHOENIX-2460.patch > > > We should have a process that runs to verify that an index is valid against a data table and potentially fixes it if discrepancies are found. This could either be a MR job or a low priority background task. -- This message was sent by Atlassian JIRA (v6.4.14#64029)