phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [1/2] phoenix git commit: PHOENIX-2460 Implement scrutiny command to validate whether or not an index is in sync with the data table (Vincent Poon)
Date Mon, 28 Aug 2017 22:42:01 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 ce6810f42 -> 08e2a29f6


http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
new file mode 100644
index 0000000..f3ff39e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * An MR job to verify that the index table is in sync with the data table.
+ *
+ */
+public class IndexScrutinyTool extends Configured implements Tool {
+
+    private static final Logger LOG = LoggerFactory.getLogger(IndexScrutinyTool.class);
+
+    private static final Option SCHEMA_NAME_OPTION =
+            new Option("s", "schema", true, "Phoenix schema name (optional)");
+    private static final Option DATA_TABLE_OPTION =
+            new Option("dt", "data-table", true, "Data table name (mandatory)");
+    private static final Option INDEX_TABLE_OPTION =
+            new Option("it", "index-table", true,
+                    "Index table name (mandatory).");
+    private static final Option TIMESTAMP =
+            new Option("t", "time", true,
+                    "Timestamp in millis used to compare the index and data tables.  Defaults to current time minus 60 seconds");
+
+    private static final Option RUN_FOREGROUND_OPTION =
+            new Option("runfg", "run-foreground", false, "Applicable on top of -direct option."
+                    + "If specified, runs index scrutiny in Foreground. Default - Runs the build in background.");
+
+    private static final Option SNAPSHOT_OPTION = //TODO check if this works
+            new Option("snap", "snapshot", false,
+                    "If specified, uses Snapshots for async index building (optional)");
+
+    public static final Option BATCH_SIZE_OPTION =
+            new Option("b", "batch-size", true, "Number of rows to compare at a time");
+    public static final Option SOURCE_TABLE_OPTION =
+            new Option("src", "source", true,
+                    "Table to use as the source table, whose rows are iterated over and compared to the other table."
+                            + " Options are DATA_TABLE_SOURCE, INDEX_TABLE_SOURCE, BOTH."
+                            + "  Defaults to BOTH, which does two separate jobs to iterate over both tables");
+
+    private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
+
+    private static final Option OUTPUT_INVALID_ROWS_OPTION =
+            new Option("o", "output", false, "Whether to output invalid rows");
+    private static final Option OUTPUT_FORMAT_OPTION =
+            new Option("of", "output-format", true,
+                    "Format in which to output invalid rows.  Options are FILE, TABLE.  Defaults to TABLE");
+    private static final Option OUTPUT_PATH_OPTION =
+            new Option("op", "output-path", true, "Output path where the files are written");
+    private static final Option OUTPUT_MAX = new Option("om", "output-max", true, "Max number of invalid rows to output per mapper.  Defaults to 1M");
+    public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_SCRUTINY_[%s]_[%s]";
+
+    /**
+     * Which table to use as the source table
+     */
+    public static enum SourceTable {
+        DATA_TABLE_SOURCE, INDEX_TABLE_SOURCE,
+        /**
+         * Runs two separate jobs to iterate over both tables
+         */
+        BOTH;
+    }
+
+    public static enum OutputFormat {
+        FILE, TABLE
+    }
+
+    private List<Job> jobs = Lists.newArrayList();
+
+    private Options getOptions() {
+        final Options options = new Options();
+        options.addOption(SCHEMA_NAME_OPTION);
+        options.addOption(DATA_TABLE_OPTION);
+        options.addOption(INDEX_TABLE_OPTION);
+        options.addOption(RUN_FOREGROUND_OPTION);
+        options.addOption(OUTPUT_INVALID_ROWS_OPTION);
+        options.addOption(OUTPUT_FORMAT_OPTION);
+        options.addOption(OUTPUT_PATH_OPTION);
+        options.addOption(OUTPUT_MAX);
+        options.addOption(SNAPSHOT_OPTION);
+        options.addOption(HELP_OPTION);
+        options.addOption(TIMESTAMP);
+        options.addOption(BATCH_SIZE_OPTION);
+        options.addOption(SOURCE_TABLE_OPTION);
+        return options;
+    }
+
+    /**
+     * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are
+     * missing.
+     * @param args supplied command line arguments
+     * @return the parsed command line
+     */
+    private CommandLine parseOptions(String[] args) {
+        final Options options = getOptions();
+
+        CommandLineParser parser = new PosixParser();
+        CommandLine cmdLine = null;
+        try {
+            cmdLine = parser.parse(options, args);
+        } catch (ParseException e) {
+            printHelpAndExit("Error parsing command line options: " + e.getMessage(), options);
+        }
+
+        if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+            printHelpAndExit(options, 0);
+        }
+
+        requireOption(cmdLine, DATA_TABLE_OPTION);
+        requireOption(cmdLine, INDEX_TABLE_OPTION);
+
+        return cmdLine;
+    }
+
+    private void requireOption(CommandLine cmdLine, Option option) {
+        if (!cmdLine.hasOption(option.getOpt())) {
+            throw new IllegalStateException(option.getLongOpt() + " is a mandatory parameter");
+        }
+    }
+
+    private void printHelpAndExit(String errorMessage, Options options) {
+        System.err.println(errorMessage);
+        printHelpAndExit(options, 1);
+    }
+
+    private void printHelpAndExit(Options options, int exitCode) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("help", options);
+        System.exit(exitCode);
+    }
+
+    class JobFactory {
+        Connection connection;
+        Configuration configuration;
+        private boolean useSnapshot;
+        private long ts;
+        private boolean outputInvalidRows;
+        private OutputFormat outputFormat;
+        private String basePath;
+        private long scrutinyExecuteTime;
+        private long outputMaxRows; // per mapper
+
+        public JobFactory(Connection connection, Configuration configuration, long batchSize,
+                boolean useSnapshot, long ts, boolean outputInvalidRows, OutputFormat outputFormat,
+                String basePath, long outputMaxRows) {
+            this.outputInvalidRows = outputInvalidRows;
+            this.outputFormat = outputFormat;
+            this.basePath = basePath;
+            this.outputMaxRows = outputMaxRows;
+            PhoenixConfigurationUtil.setScrutinyBatchSize(configuration, batchSize);
+            this.connection = connection;
+            this.configuration = configuration;
+            this.useSnapshot = useSnapshot;
+            this.ts = ts; // CURRENT_SCN to set
+            scrutinyExecuteTime = EnvironmentEdgeManager.currentTimeMillis(); // time at which scrutiny was run.
+                                                              // Same for
+            // all jobs created from this factory
+            PhoenixConfigurationUtil.setScrutinyExecuteTimestamp(configuration,
+                scrutinyExecuteTime);
+        }
+
+        public Job createSubmittableJob(String schemaName, String indexTable, String dataTable,
+                SourceTable sourceTable) throws Exception {
+            Preconditions.checkArgument(SourceTable.DATA_TABLE_SOURCE.equals(sourceTable)
+                    || SourceTable.INDEX_TABLE_SOURCE.equals(sourceTable));
+
+            final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
+            final String qIndexTable;
+            if (schemaName != null && !schemaName.isEmpty()) {
+                qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable);
+            } else {
+                qIndexTable = indexTable;
+            }
+            PhoenixConfigurationUtil.setScrutinyDataTable(configuration, qDataTable);
+            PhoenixConfigurationUtil.setScrutinyIndexTable(configuration, qIndexTable);
+            PhoenixConfigurationUtil.setScrutinySourceTable(configuration, sourceTable);
+            PhoenixConfigurationUtil.setScrutinyOutputInvalidRows(configuration, outputInvalidRows);
+            PhoenixConfigurationUtil.setScrutinyOutputMax(configuration, outputMaxRows);
+
+            final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable);
+            final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable);
+
+            // set CURRENT_SCN for our scan so that incoming writes don't throw off scrutiny
+            configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, Long.toString(ts));
+
+            // set the source table to either data or index table
+            SourceTargetColumnNames columnNames =
+                    SourceTable.DATA_TABLE_SOURCE.equals(sourceTable)
+                            ? new SourceTargetColumnNames.DataSourceColNames(pdataTable,
+                                    pindexTable)
+                            : new SourceTargetColumnNames.IndexSourceColNames(pdataTable,
+                                    pindexTable);
+            String qSourceTable = columnNames.getQualifiedSourceTableName();
+            List<String> sourceColumnNames = columnNames.getSourceColNames();
+            List<String> sourceDynamicCols = columnNames.getSourceDynamicCols();
+            List<String> targetDynamicCols = columnNames.getTargetDynamicCols();
+
+            // Setup the select query against source - we either select the index columns from the
+            // index table,
+            // or select the data table equivalents of the index columns from the data table
+            final String selectQuery =
+                    QueryUtil.constructSelectStatement(qSourceTable, sourceColumnNames, null,
+                        Hint.NO_INDEX, true);
+            LOG.info("Query used on source table to feed the mapper: " + selectQuery);
+
+            PhoenixConfigurationUtil.setScrutinyOutputFormat(configuration, outputFormat);
+            // if outputting to table, setup the upsert to the output table
+            if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) {
+                String upsertStmt =
+                        IndexScrutinyTableOutput.constructOutputTableUpsert(sourceDynamicCols,
+                            targetDynamicCols, connection);
+                PhoenixConfigurationUtil.setUpsertStatement(configuration, upsertStmt);
+                LOG.info("Upsert statement used for output table: " + upsertStmt);
+            }
+
+            final String jobName =
+                    String.format(INDEX_JOB_NAME_TEMPLATE, qSourceTable,
+                        columnNames.getQualifiedTargetTableName());
+            final Job job = Job.getInstance(configuration, jobName);
+
+            if (!useSnapshot) {
+                PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable,
+                    selectQuery);
+            } else { // TODO check if using a snapshot works
+                HBaseAdmin admin = null;
+                String snapshotName;
+                try {
+                    final PhoenixConnection pConnection =
+                            connection.unwrap(PhoenixConnection.class);
+                    admin = pConnection.getQueryServices().getAdmin();
+                    String pdataTableName = pdataTable.getName().getString();
+                    snapshotName = new StringBuilder(pdataTableName).append("-Snapshot").toString();
+                    admin.snapshot(snapshotName, TableName.valueOf(pdataTableName));
+                } finally {
+                    if (admin != null) {
+                        admin.close();
+                    }
+                }
+                // root dir not a subdirectory of hbase dir
+                Path rootDir = new Path("hdfs:///index-snapshot-dir");
+                FSUtils.setRootDir(configuration, rootDir);
+                Path restoreDir = new Path(FSUtils.getRootDir(configuration), "restore-dir");
+
+                // set input for map reduce job using hbase snapshots
+                //PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, snapshotName,
+                //    qDataTable, restoreDir, selectQuery);
+            }
+            TableMapReduceUtil.initCredentials(job);
+            Path outputPath =
+                    getOutputPath(configuration, basePath,
+                        SourceTable.DATA_TABLE_SOURCE.equals(sourceTable) ? pdataTable
+                                : pindexTable);
+
+            return configureSubmittableJob(job, outputPath);
+        }
+
+        private Job configureSubmittableJob(Job job, Path outputPath) throws Exception {
+            Configuration conf = job.getConfiguration();
+            conf.setBoolean("mapreduce.job.user.classpath.first", true);
+            HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+            job.setJarByClass(IndexScrutinyTool.class);
+            job.setOutputFormatClass(NullOutputFormat.class);
+            if (outputInvalidRows && OutputFormat.FILE.equals(outputFormat)) {
+                job.setOutputFormatClass(TextOutputFormat.class);
+                FileOutputFormat.setOutputPath(job, outputPath);
+            }
+            job.setMapperClass(IndexScrutinyMapper.class);
+            job.setNumReduceTasks(0);
+            // Set the Output classes
+            job.setMapOutputKeyClass(Text.class);
+            job.setMapOutputValueClass(Text.class);
+            TableMapReduceUtil.addDependencyJars(job);
+            return job;
+        }
+
+        Path getOutputPath(final Configuration configuration, String basePath, PTable table)
+                throws IOException {
+            Path outputPath = null;
+            FileSystem fs;
+            if (basePath != null) {
+                outputPath =
+                        CsvBulkImportUtil.getOutputPath(new Path(basePath),
+                            table.getPhysicalName().getString());
+                fs = outputPath.getFileSystem(configuration);
+                fs.delete(outputPath, true);
+            }
+            return outputPath;
+        }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Connection connection = null;
+        try {
+            /** start - parse command line configs **/
+            CommandLine cmdLine = null;
+            try {
+                cmdLine = parseOptions(args);
+            } catch (IllegalStateException e) {
+                printHelpAndExit(e.getMessage(), getOptions());
+            }
+            final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf());
+            connection = ConnectionUtil.getInputConnection(configuration);
+            final String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+            final String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
+            final String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+            final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
+            String basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
+            boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+            boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
+            boolean outputInvalidRows = cmdLine.hasOption(OUTPUT_INVALID_ROWS_OPTION.getOpt());
+            SourceTable sourceTable =
+                    cmdLine.hasOption(SOURCE_TABLE_OPTION.getOpt())
+                            ? SourceTable
+                                    .valueOf(cmdLine.getOptionValue(SOURCE_TABLE_OPTION.getOpt()))
+                            : SourceTable.BOTH;
+
+            long batchSize =
+                    cmdLine.hasOption(BATCH_SIZE_OPTION.getOpt())
+                            ? Long.parseLong(cmdLine.getOptionValue(BATCH_SIZE_OPTION.getOpt()))
+                            : PhoenixConfigurationUtil.DEFAULT_SCRUTINY_BATCH_SIZE;
+
+            long ts =
+                    cmdLine.hasOption(TIMESTAMP.getOpt())
+                            ? Long.parseLong(cmdLine.getOptionValue(TIMESTAMP.getOpt()))
+                            : EnvironmentEdgeManager.currentTimeMillis() - 60000;
+
+            if (indexTable != null) {
+                if (!isValidIndexTable(connection, qDataTable, indexTable)) {
+                    throw new IllegalArgumentException(String
+                            .format(" %s is not an index table for %s ", indexTable, qDataTable));
+                }
+            }
+
+            String outputFormatOption = cmdLine.getOptionValue(OUTPUT_FORMAT_OPTION.getOpt());
+            OutputFormat outputFormat =
+                    outputFormatOption != null
+                            ? OutputFormat.valueOf(outputFormatOption.toUpperCase())
+                            : OutputFormat.TABLE;
+            long outputMaxRows =
+                    cmdLine.hasOption(OUTPUT_MAX.getOpt())
+                            ? Long.parseLong(cmdLine.getOptionValue(OUTPUT_MAX.getOpt()))
+                            : 1000000L;
+            /** end - parse command line configs **/
+
+            if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) {
+                // create the output table if it doesn't exist
+                try (Connection outputConn = ConnectionUtil.getOutputConnection(configuration)) {
+                    outputConn.createStatement().execute(IndexScrutinyTableOutput.OUTPUT_TABLE_DDL);
+                    outputConn.createStatement()
+                            .execute(IndexScrutinyTableOutput.OUTPUT_METADATA_DDL);
+                }
+            }
+
+            LOG.info(String.format(
+                "Running scrutiny [schemaName=%s, dataTable=%s, indexTable=%s, useSnapshot=%s, timestamp=%s, batchSize=%s, outputBasePath=%s, outputFormat=%s, outputMaxRows=%s]",
+                schemaName, dataTable, indexTable, useSnapshot, ts, batchSize, basePath,
+                outputFormat, outputMaxRows));
+            JobFactory jobFactory =
+                    new JobFactory(connection, configuration, batchSize, useSnapshot, ts,
+                            outputInvalidRows, outputFormat, basePath, outputMaxRows);
+            // If we are running the scrutiny with both tables as the source, run two separate jobs,
+            // one for each direction
+            if (SourceTable.BOTH.equals(sourceTable)) {
+                jobs.add(jobFactory.createSubmittableJob(schemaName, indexTable, dataTable,
+                    SourceTable.DATA_TABLE_SOURCE));
+                jobs.add(jobFactory.createSubmittableJob(schemaName, indexTable, dataTable,
+                    SourceTable.INDEX_TABLE_SOURCE));
+            } else {
+                jobs.add(jobFactory.createSubmittableJob(schemaName, indexTable, dataTable,
+                    sourceTable));
+            }
+
+            if (!isForeground) {
+                LOG.info("Running Index Scrutiny in Background - Submit async and exit");
+                for (Job job : jobs) {
+                    job.submit();
+                }
+                return 0;
+            }
+            LOG.info(
+                "Running Index Scrutiny in Foreground. Waits for the build to complete. This may take a long time!.");
+            boolean result = true;
+            for (Job job : jobs) {
+                result = result && job.waitForCompletion(true);
+            }
+
+            // write the results to the output metadata table
+            if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) {
+                LOG.info("Writing results of jobs to output table "
+                        + IndexScrutinyTableOutput.OUTPUT_METADATA_TABLE_NAME);
+                IndexScrutinyTableOutput.writeJobResults(connection, args, jobs);
+            }
+
+            if (result) {
+                return 0;
+            } else {
+                LOG.error("IndexScrutinyTool job failed! Check logs for errors..");
+                return -1;
+            }
+        } catch (Exception ex) {
+            LOG.error("An exception occurred while performing the indexing job: "
+                    + ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex));
+            return -1;
+        } finally {
+            try {
+                if (connection != null) {
+                    connection.close();
+                }
+            } catch (SQLException sqle) {
+                LOG.error("Failed to close connection ", sqle.getMessage());
+                throw new RuntimeException("Failed to close connection");
+            }
+        }
+    }
+
+    @VisibleForTesting
+    public List<Job> getJobs() {
+        return jobs;
+    }
+
+    /**
+     * Checks for the validity of the index table passed to the job.
+     * @param connection
+     * @param masterTable
+     * @param indexTable
+     * @return
+     * @throws SQLException
+     */
+    private boolean isValidIndexTable(final Connection connection, final String masterTable,
+            final String indexTable) throws SQLException {
+        final DatabaseMetaData dbMetaData = connection.getMetaData();
+        final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable);
+        final String tableName =
+                SchemaUtil.normalizeIdentifier(SchemaUtil.getTableNameFromFullName(masterTable));
+
+        ResultSet rs = null;
+        try {
+            rs = dbMetaData.getIndexInfo(null, schemaName, tableName, false, false);
+            while (rs.next()) {
+                final String indexName = rs.getString(6);
+                if (indexTable.equalsIgnoreCase(indexName)) {
+                    return true;
+                }
+            }
+        } finally {
+            if (rs != null) {
+                rs.close();
+            }
+        }
+        return false;
+    }
+
+    public static void main(final String[] args) throws Exception {
+        int result = ToolRunner.run(new IndexScrutinyTool(), args);
+        System.exit(result);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java
index 2be810a..e426390 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java
@@ -23,6 +23,7 @@ import java.sql.SQLException;
 import java.util.List;
 
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.util.ColumnInfo;
 
 import com.google.common.base.Preconditions;
@@ -40,6 +41,8 @@ public class PhoenixIndexDBWritable  implements DBWritable {
     private List<Object> values;
     
     private int columnCount = -1;
+
+    private long rowTs = -1;
     
     @Override
     public void write(PreparedStatement statement) throws SQLException {
@@ -63,7 +66,9 @@ public class PhoenixIndexDBWritable  implements DBWritable {
         if(columnCount == -1) {
             this.columnCount = resultSet.getMetaData().getColumnCount();
         }
-  
+        if (columnCount > 0) {
+            this.rowTs = resultSet.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp();
+        }
         values = Lists.newArrayListWithCapacity(columnCount);
         for(int i = 0 ; i < columnCount ; i++) {
             Object value = resultSet.getObject(i + 1);
@@ -88,4 +93,8 @@ public class PhoenixIndexDBWritable  implements DBWritable {
         this.values = values;
     }
 
+    public long getRowTs() {
+        return rowTs;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
new file mode 100644
index 0000000..3cf73fd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java
@@ -0,0 +1,41 @@
+/**
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+/**
+ * Counters used for Index Scrutiny MR job
+ */
+public enum PhoenixScrutinyJobCounters {
+    /**
+     * number of rows in data table with a valid index row (or vice-versa)
+     */
+    VALID_ROW_COUNT,
+    /**
+     * number of rows in data table with an invalid index row (or vice-versa)
+     */
+    INVALID_ROW_COUNT,
+    /**
+     * Number of rows in the index table with an incorrect covered column value
+     */
+    BAD_COVERED_COL_VAL_COUNT,
+    /**
+     * Number of batches processed
+     */
+    BATCHES_PROCESSED_COUNT;
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/SourceTargetColumnNames.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/SourceTargetColumnNames.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/SourceTargetColumnNames.java
new file mode 100644
index 0000000..1c7991f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/SourceTargetColumnNames.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.util.List;
+
+import org.apache.phoenix.mapreduce.util.IndexColumnNames;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.SchemaUtil;
+
+/**
+ * Get index scrutiny source/target column names, depending on whether the source is the
+ * data table or index table
+ */
+public interface SourceTargetColumnNames {
+
+    List<String> getSourceColNames();
+
+    List<String> getUnqualifiedSourceColNames();
+
+    List<String> getTargetColNames();
+
+    /**
+     * @return The target column name with a CAST to the source's data type
+     */
+    List<String> getCastedTargetColNames();
+
+    List<String> getUnqualifiedTargetColNames();
+
+    List<String> getSourceDynamicCols();
+
+    List<String> getTargetDynamicCols();
+
+    List<String> getTargetPkColNames();
+
+    List<String> getSourcePkColNames();
+
+    String getQualifiedSourceTableName();
+
+    String getQualifiedTargetTableName();
+
+    /**
+     * Used when the data table is the source table of a scrutiny
+     */
+    public static class DataSourceColNames extends IndexColumnNames
+            implements SourceTargetColumnNames {
+        /**
+         * @param pdataTable the data table
+         * @param pindexTable the index table for the data table
+         */
+        public DataSourceColNames(PTable pdataTable, PTable pindexTable) {
+            super(pdataTable, pindexTable);
+        }
+
+        @Override
+        public List<String> getSourceColNames() {
+            return getDataColNames();
+        }
+
+        @Override
+        public List<String> getUnqualifiedSourceColNames() {
+            return getUnqualifiedDataColNames();
+        }
+
+        @Override
+        public List<String> getUnqualifiedTargetColNames() {
+            return getUnqualifiedIndexColNames();
+        }
+
+        @Override
+        public List<String> getTargetColNames() {
+            return getIndexColNames();
+        }
+
+        @Override
+        public List<String> getSourceDynamicCols() {
+            return getDynamicDataCols();
+        }
+
+        @Override
+        public List<String> getTargetDynamicCols() {
+            return getDynamicIndexCols();
+        }
+
+        @Override
+        public List<String> getTargetPkColNames() {
+            return getIndexPkColNames();
+        }
+
+        @Override
+        public List<String> getSourcePkColNames() {
+            return getDataPkColNames();
+        }
+
+        @Override
+        public String getQualifiedSourceTableName() {
+            return getQualifiedDataTableName();
+        }
+
+        @Override
+        public String getQualifiedTargetTableName() {
+            return getQualifiedIndexTableName();
+        }
+
+        @Override
+        public List<String> getCastedTargetColNames() {
+            return getCastedColumnNames(getIndexColNames(), dataColSqlTypeNames);
+        }
+
+    }
+
+    /**
+     * Used when the index table is the source table of a scrutiny
+     */
+    public static class IndexSourceColNames extends IndexColumnNames
+            implements SourceTargetColumnNames {
+        /**
+         * @param pdataTable the data table
+         * @param pindexTable the index table for the data table
+         */
+        public IndexSourceColNames(PTable pdataTable, PTable pindexTable) {
+            super(pdataTable, pindexTable);
+        }
+
+        @Override
+        public List<String> getSourceColNames() {
+            return getIndexColNames();
+        }
+
+        @Override
+        public List<String> getUnqualifiedSourceColNames() {
+            return getUnqualifiedIndexColNames();
+        }
+
+        @Override
+        public List<String> getUnqualifiedTargetColNames() {
+            return getUnqualifiedDataColNames();
+        }
+
+        @Override
+        public List<String> getTargetColNames() {
+            return getDataColNames();
+        }
+
+        @Override
+        public List<String> getSourceDynamicCols() {
+            return getDynamicIndexCols();
+        }
+
+        @Override
+        public List<String> getTargetDynamicCols() {
+            return getDynamicDataCols();
+        }
+
+        @Override
+        public List<String> getTargetPkColNames() {
+            return getDataPkColNames();
+        }
+
+        @Override
+        public List<String> getSourcePkColNames() {
+            return getIndexPkColNames();
+        }
+
+        @Override
+        public String getQualifiedSourceTableName() {
+            return getQualifiedIndexTableName();
+        }
+
+        @Override
+        public String getQualifiedTargetTableName() {
+            return getQualifiedDataTableName();
+        }
+
+        @Override
+        public List<String> getCastedTargetColNames() {
+            return getCastedColumnNames(getDataColNames(), indexColSqlTypeNames);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
new file mode 100644
index 0000000..5daa1ed
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import java.sql.Types;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * Gets index column names and their data table equivalents
+ */
+public class IndexColumnNames {
+    private List<String> dataNonPkColNames = Lists.newArrayList();
+    private List<String> dataPkColNames = Lists.newArrayList();
+    private List<String> dataColNames;
+    protected List<String> dataColSqlTypeNames = Lists.newArrayList();
+    private List<String> indexPkColNames = Lists.newArrayList();
+    private List<String> indexNonPkColNames = Lists.newArrayList();
+    private List<String> indexColNames;
+    protected List<String> indexColSqlTypeNames = Lists.newArrayList();
+    private PTable pdataTable;
+    private PTable pindexTable;
+
+    public IndexColumnNames(final PTable pdataTable, final PTable pindexTable) {
+        this.pdataTable = pdataTable;
+        this.pindexTable = pindexTable;
+        List<PColumn> pindexCols = pindexTable.getColumns();
+        Set<String> indexColsAdded = new HashSet<String>();
+
+        // first add the data pk columns
+        for (PColumn indexCol : pindexCols) {
+            if (IndexUtil.isDataPKColumn(indexCol)) {
+                String indexColumnName = indexCol.getName().getString();
+                PColumn dPkCol = IndexUtil.getDataColumn(pdataTable, indexColumnName);
+                dataPkColNames.add(getDataColFullName(dPkCol));
+                dataColSqlTypeNames.add(getDataTypeString(dPkCol));
+                indexPkColNames.add(indexColumnName);
+                indexColSqlTypeNames.add(getDataTypeString(indexCol));
+                indexColsAdded.add(indexColumnName);
+            }
+        }
+
+        // then the rest of the index pk columns
+        for (PColumn indexPkCol : pindexTable.getPKColumns()) {
+            String indexColName = indexPkCol.getName().getString();
+            if (!indexColsAdded.contains(indexColName)) {
+                indexPkColNames.add(indexColName);
+                indexColSqlTypeNames.add(getDataTypeString(indexPkCol));
+                PColumn dCol = IndexUtil.getDataColumn(pdataTable, indexColName);
+                dataNonPkColNames.add(getDataColFullName(dCol));
+                dataColSqlTypeNames.add(getDataTypeString(dCol));
+                indexColsAdded.add(indexColName);
+            }
+        }
+
+        // then the covered columns (rest of the columns)
+        for (PColumn indexCol : pindexTable.getColumns()) {
+            String indexColName = indexCol.getName().getString();
+            if (!indexColsAdded.contains(indexColName)) {
+                indexNonPkColNames.add(indexColName);
+                indexColSqlTypeNames.add(getDataTypeString(indexCol));
+                PColumn dCol = IndexUtil.getDataColumn(pdataTable, indexColName);
+                dataNonPkColNames.add(getDataColFullName(dCol));
+                dataColSqlTypeNames.add(getDataTypeString(dCol));
+            }
+        }
+
+        indexColNames = Lists.newArrayList(Iterables.concat(indexPkColNames, indexNonPkColNames));
+        dataColNames = Lists.newArrayList(Iterables.concat(dataPkColNames, dataNonPkColNames));
+    }
+
+    private String getDataTypeString(PColumn col) {
+        PDataType<?> dataType = col.getDataType();
+        switch (dataType.getSqlType()) {
+        case Types.DECIMAL:
+            String typeStr = dataType.toString();
+            if (col.getMaxLength() != null) {
+                typeStr += "(" + col.getMaxLength().toString();
+                if (col.getScale() != null) {
+                    typeStr += "," + col.getScale().toString();
+                }
+                typeStr += ")";
+            }
+            return typeStr;
+        default:
+            if (col.getMaxLength() != null) {
+                return String.format("%s(%s)", dataType.toString(), col.getMaxLength());
+            }
+            return dataType.toString();
+        }
+    }
+
+    private String getDataColFullName(PColumn dCol) {
+        String dColFullName = "";
+        if (dCol.getFamilyName() != null) {
+            dColFullName += dCol.getFamilyName().getString() + QueryConstants.NAME_SEPARATOR;
+        }
+        dColFullName += dCol.getName().getString();
+        return dColFullName;
+    }
+
+    private List<String> getDynamicCols(List<String> colNames, List<String> colTypes) {
+        List<String> dynamicCols = Lists.newArrayListWithCapacity(colNames.size());
+        for (int i = 0; i < colNames.size(); i++) {
+            String dataColName = colNames.get(i);
+            String dataColType = colTypes.get(i);
+            String dynamicCol =
+                    SchemaUtil.getEscapedFullColumnName(dataColName) + " " + dataColType;
+            dynamicCols.add(dynamicCol);
+        }
+        return dynamicCols;
+    }
+
+    private List<String> getUnqualifiedColNames(List<String> qualifiedCols) {
+        return Lists.transform(qualifiedCols, new Function<String, String>() {
+            @Override
+            public String apply(String qCol) {
+                return SchemaUtil.getTableNameFromFullName(qCol, QueryConstants.NAME_SEPARATOR);
+            }
+        });
+    }
+
+    protected List<String> getCastedColumnNames(List<String> colNames, List<String> castTypes) {
+        List<String> castColNames = Lists.newArrayListWithCapacity(colNames.size());
+        colNames = SchemaUtil.getEscapedFullColumnNames(colNames);
+        for (int i = 0; i < colNames.size(); i++) {
+            castColNames.add("CAST(" + colNames.get(i) + " AS " + castTypes.get(i) + ")");
+        }
+        return castColNames;
+    }
+
+    public String getQualifiedDataTableName() {
+        return SchemaUtil.getQualifiedTableName(pdataTable.getSchemaName().getString(),
+            pdataTable.getTableName().getString());
+    }
+
+    public String getQualifiedIndexTableName() {
+        return SchemaUtil.getQualifiedTableName(pindexTable.getSchemaName().getString(),
+            pindexTable.getTableName().getString());
+    }
+
+    /**
+     * @return the escaped data column names (equivalents for the index columns) along with their
+     *         sql type, for use in dynamic column queries/upserts
+     */
+    public List<String> getDynamicDataCols() {
+        // don't want the column family for dynamic columns
+        return getDynamicCols(getUnqualifiedDataColNames(), dataColSqlTypeNames);
+
+    }
+
+    /**
+     * @return the escaped index column names along with their sql type, for use in dynamic column
+     *         queries/upserts
+     */
+    public List<String> getDynamicIndexCols() {
+        // don't want the column family for dynamic columns
+        return getDynamicCols(getUnqualifiedIndexColNames(), indexColSqlTypeNames);
+    }
+
+    /**
+     * @return the corresponding data table column names for the index columns, leading with the
+     *         data table pk columns
+     */
+    public List<String> getDataColNames() {
+        return dataColNames;
+    }
+
+    /**
+     * @return same as getDataColNames, without the column family qualifier
+     */
+    public List<String> getUnqualifiedDataColNames() {
+        return getUnqualifiedColNames(dataColNames);
+    }
+
+    /**
+     * @return the corresponding data table column names for the index columns, which are not part
+     *         of the data table pk
+     */
+    public List<String> getDataNonPkColNames() {
+        return dataNonPkColNames;
+    }
+
+    /**
+     * @return the corresponding data table column names for the index columns, which are part of
+     *         the data table pk
+     */
+    public List<String> getDataPkColNames() {
+        return dataPkColNames;
+    }
+
+    /**
+     * @return the index column names, leading with the data table pk columns
+     */
+    public List<String> getIndexColNames() {
+        return indexColNames;
+    }
+
+    /**
+     * @return same as getIndexColNames, without the column family qualifier
+     */
+    public List<String> getUnqualifiedIndexColNames() {
+        return getUnqualifiedColNames(indexColNames);
+    }
+
+    /**
+     * @return the index pk column names
+     */
+    public List<String> getIndexPkColNames() {
+        return indexPkColNames;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 1302f85..db11f7d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -41,6 +41,8 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.FormatToBytesWritableMapper;
 import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
 import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
@@ -103,7 +105,29 @@ public final class PhoenixConfigurationUtil {
     public static final String INDEX_DISABLED_TIMESTAMP_VALUE = "phoenix.mr.index.disableTimestamp";
 
     public static final String INDEX_MAINTAINERS = "phoenix.mr.index.maintainers";
-    
+
+    public static final String SCRUTINY_DATA_TABLE_NAME = "phoenix.mr.scrutiny.data.table.name";
+
+    public static final String SCRUTINY_INDEX_TABLE_NAME = "phoenix.mr.scrutiny.index.table.name";
+
+    public static final String SCRUTINY_SOURCE_TABLE = "phoenix.mr.scrutiny.source.table";
+
+    public static final String SCRUTINY_BATCH_SIZE = "phoenix.mr.scrutiny.batch.size";
+
+    public static final String SCRUTINY_OUTPUT_INVALID_ROWS =
+            "phoenix.mr.scrutiny.output.invalid.rows";
+
+    public static final boolean DEFAULT_SCRUTINY_OUTPUT_INVALID_ROWS = false;
+
+    public static final String SCRUTINY_OUTPUT_FORMAT = "phoenix.mr.scrutiny.output.format";
+
+    public static final String SCRUTINY_EXECUTE_TIMESTAMP = "phoenix.mr.scrutiny.execute.timestamp";
+
+    // max output rows per mapper
+    public static final String SCRUTINY_OUTPUT_MAX = "phoenix.mr.scrutiny.output.max";
+
+    public static final long DEFAULT_SCRUTINY_BATCH_SIZE = 1000;
+
     public static final String DISABLED_INDEXES = "phoenix.mr.index.disabledIndexes";
 
     // Generate splits based on scans from stats, or just from region splits
@@ -295,6 +319,12 @@ public final class PhoenixConfigurationUtil {
         return upsertStmt;
         
     }
+
+     public static void setUpsertStatement(final Configuration configuration, String upsertStmt) throws SQLException {
+         Preconditions.checkNotNull(configuration);
+         Preconditions.checkNotNull(upsertStmt);
+         configuration.set(UPSERT_STATEMENT, upsertStmt);
+     }
     
     public static List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration) throws SQLException {
         Preconditions.checkNotNull(configuration);
@@ -475,7 +505,101 @@ public final class PhoenixConfigurationUtil {
         Preconditions.checkNotNull(indexName);
         configuration.set(DISABLED_INDEXES, indexName);
     }
-    
+
+    public static String getScrutinyDataTableName(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.get(SCRUTINY_DATA_TABLE_NAME);
+    }
+
+    public static void setScrutinyDataTable(Configuration configuration, String qDataTableName) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(qDataTableName);
+        configuration.set(SCRUTINY_DATA_TABLE_NAME, qDataTableName);
+    }
+
+    public static String getScrutinyIndexTableName(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.get(SCRUTINY_INDEX_TABLE_NAME);
+    }
+
+    public static void setScrutinyIndexTable(Configuration configuration, String qIndexTableName) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(qIndexTableName);
+        configuration.set(SCRUTINY_INDEX_TABLE_NAME, qIndexTableName);
+    }
+
+    public static SourceTable getScrutinySourceTable(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return SourceTable.valueOf(configuration.get(SCRUTINY_SOURCE_TABLE));
+    }
+
+    public static void setScrutinySourceTable(Configuration configuration,
+            SourceTable sourceTable) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(sourceTable);
+        configuration.set(SCRUTINY_SOURCE_TABLE, sourceTable.name());
+    }
+
+    public static boolean getScrutinyOutputInvalidRows(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.getBoolean(SCRUTINY_OUTPUT_INVALID_ROWS,
+            DEFAULT_SCRUTINY_OUTPUT_INVALID_ROWS);
+    }
+
+    public static void setScrutinyOutputInvalidRows(Configuration configuration,
+            boolean outputInvalidRows) {
+        Preconditions.checkNotNull(configuration);
+        configuration.setBoolean(SCRUTINY_OUTPUT_INVALID_ROWS, outputInvalidRows);
+    }
+
+    public static long getScrutinyBatchSize(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return configuration.getLong(SCRUTINY_BATCH_SIZE, DEFAULT_SCRUTINY_BATCH_SIZE);
+    }
+
+    public static void setScrutinyBatchSize(Configuration configuration, long batchSize) {
+        Preconditions.checkNotNull(configuration);
+        configuration.setLong(SCRUTINY_BATCH_SIZE, batchSize);
+    }
+
+    public static OutputFormat getScrutinyOutputFormat(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        return OutputFormat
+                .valueOf(configuration.get(SCRUTINY_OUTPUT_FORMAT, OutputFormat.FILE.name()));
+    }
+
+    public static void setScrutinyOutputFormat(Configuration configuration,
+            OutputFormat outputFormat) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(outputFormat);
+        configuration.set(SCRUTINY_OUTPUT_FORMAT, outputFormat.name());
+    }
+
+    public static long getScrutinyExecuteTimestamp(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        long ts = configuration.getLong(SCRUTINY_EXECUTE_TIMESTAMP, -1);
+        Preconditions.checkArgument(ts != -1);
+        return ts;
+    }
+
+    public static void setScrutinyOutputMax(Configuration configuration,
+            long outputMaxRows) {
+        Preconditions.checkNotNull(configuration);
+        configuration.setLong(SCRUTINY_OUTPUT_MAX, outputMaxRows);
+    }
+
+    public static long getScrutinyOutputMax(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        long maxRows = configuration.getLong(SCRUTINY_OUTPUT_MAX, -1);
+        Preconditions.checkArgument(maxRows != -1);
+        return maxRows;
+    }
+
+    public static void setScrutinyExecuteTimestamp(Configuration configuration, long ts) {
+        Preconditions.checkNotNull(configuration);
+        configuration.setLong(SCRUTINY_EXECUTE_TIMESTAMP, ts);
+    }
+
     public static String getDisableIndexes(Configuration configuration) {
         Preconditions.checkNotNull(configuration);
         return configuration.get(DISABLED_INDEXES);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 02cbb6c..24760b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -30,6 +30,7 @@ import java.util.Properties;
 
 import javax.annotation.Nullable;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -204,30 +205,74 @@ public final class QueryUtil {
      * @return Select Query 
      */
     public static String constructSelectStatement(String fullTableName, List<ColumnInfo> columnInfos,final String conditions) {
-        Preconditions.checkNotNull(fullTableName,"Table name cannot be null");
-        if(columnInfos == null || columnInfos.isEmpty()) {
-             throw new IllegalArgumentException("At least one column must be provided");
+        List<String> columns = Lists.transform(columnInfos, new Function<ColumnInfo, String>(){
+            @Override
+            public String apply(ColumnInfo input) {
+                return input.getColumnName();
+            }});
+        return constructSelectStatement(fullTableName, columns , conditions, null, false);
+    }
+
+    /**
+     *
+     * @param fullTableName name of the table for which the select statement needs to be created.
+     * @param columns list of columns to be projected in the select statement.
+     * @param conditions The condition clause to be added to the WHERE condition
+     * @param hint hint to use
+     * @param escapeCols whether to escape the projected columns
+     * @return Select Query
+     */
+    public static String constructSelectStatement(String fullTableName, List<String> columns,
+            final String conditions, Hint hint, boolean escapeCols) {
+        Preconditions.checkNotNull(fullTableName, "Table name cannot be null");
+        if (columns == null || columns.isEmpty()) {
+            throw new IllegalArgumentException("At least one column must be provided");
         }
         StringBuilder query = new StringBuilder();
         query.append("SELECT ");
-        for (ColumnInfo cinfo : columnInfos) {
-            if (cinfo != null) {
-                String fullColumnName = getEscapedFullColumnName(cinfo.getColumnName());
+
+        String hintStr = "";
+        if (hint != null) {
+            final HintNode node = new HintNode(hint.name());
+            hintStr = node.toString();
+        }
+        query.append(hintStr);
+
+        for (String col : columns) {
+            if (col != null) {
+                String fullColumnName = col;
+                if (escapeCols) {
+                    fullColumnName = getEscapedFullColumnName(col);
+                }
                 query.append(fullColumnName);
                 query.append(",");
-             }
-         }
+            }
+        }
         // Remove the trailing comma
         query.setLength(query.length() - 1);
         query.append(" FROM ");
         query.append(fullTableName);
-        if(conditions != null && conditions.length() > 0) {
+        if (conditions != null && conditions.length() > 0) {
             query.append(" WHERE (").append(conditions).append(")");
         }
         return query.toString();
     }
 
     /**
+     * Constructs parameterized filter for an IN clause e.g. passing in numWhereCols=2, numBatches=3
+     * results in ((?,?),(?,?),(?,?))
+     * @param numWhereCols number of WHERE columns
+     * @param numBatches number of column batches
+     * @return paramterized IN filter
+     */
+    public static String constructParameterizedInClause(int numWhereCols, int numBatches) {
+        Preconditions.checkArgument(numWhereCols > 0);
+        Preconditions.checkArgument(numBatches > 0);
+        String batch = "(" + StringUtils.repeat("?", ",", numWhereCols) + ")";
+        return "(" + StringUtils.repeat(batch, ",", numBatches) + ")";
+    }
+
+    /**
      * Create the Phoenix JDBC connection URL from the provided cluster connection details.
      */
     public static String getUrl(String zkQuorum) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index d54670c..51f6ff9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -80,7 +80,9 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -335,6 +337,15 @@ public class SchemaUtil {
         return getName(familyName, columnName, false);
     }
 
+    public static List<String> getColumnNames(List<PColumn> pCols) {
+        return Lists.transform(pCols, new Function<PColumn, String>() {
+            @Override
+            public String apply(PColumn input) {
+                return input.getName().getString();
+            }
+        });
+    }
+
     public static byte[] getTableNameAsBytes(String schemaName, String tableName) {
         if (schemaName == null || schemaName.length() == 0) {
             return StringUtil.toBytes(tableName);
@@ -767,6 +778,16 @@ public class SchemaUtil {
         return getEscapedArgument(columnFamily) + QueryConstants.NAME_SEPARATOR + getEscapedArgument(columnName) ;
     }
     
+    public static List<String> getEscapedFullColumnNames(List<String> fullColumnNames) {
+        return Lists
+                .newArrayList(Iterables.transform(fullColumnNames, new Function<String, String>() {
+                    @Override
+                    public String apply(String col) {
+                        return getEscapedFullColumnName(col);
+                    }
+                }));
+    }
+
     public static String getEscapedFullTableName(String fullTableName) {
         final String schemaName = getSchemaNameFromFullName(fullTableName);
         final String tableName = getTableNameFromFullName(fullTableName);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/BaseIndexTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/BaseIndexTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/BaseIndexTest.java
new file mode 100644
index 0000000..4ec4a0c
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/BaseIndexTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Properties;
+
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+/**
+ *
+ * Creates a simple data table and index table
+ *
+ */
+public class BaseIndexTest extends BaseConnectionlessQueryTest {
+    protected static final String SCHEMA_NAME = "TEST_SCHEMA";
+    protected static final String DATA_TABLE_NAME = "TEST_INDEX_COLUMN_NAMES_UTIL";
+    protected static final String INDEX_TABLE_NAME = "TEST_ICN_INDEX";
+    protected static final String DATA_TABLE_FULL_NAME = SCHEMA_NAME + "." + DATA_TABLE_NAME;
+    protected static final String INDEX_TABLE_FULL_NAME = SCHEMA_NAME + "." + INDEX_TABLE_NAME;
+
+    private static final String DATA_TABLE_DDL =
+            "CREATE TABLE IF NOT EXISTS " + DATA_TABLE_FULL_NAME + "\n" +
+            "(\n" +
+            "    ID INTEGER NOT NULL,\n" +
+            "    PK_PART2 TINYINT NOT NULL,\n" +
+            "    NAME VARCHAR,\n" +
+            "    ZIP BIGINT,\n" +
+            "    EMPLOYER CHAR(20),\n" +
+            "    CONSTRAINT PK PRIMARY KEY\n" +
+            "    (\n" +
+            "        ID,\n" +
+            "        PK_PART2\n" +
+            "        \n" +
+            "    )\n" +
+            ")";
+
+    private static final String INDEX_TABLE_DDL =
+            "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME
+                    + " (NAME) INCLUDE (ZIP)";
+    protected PTable pDataTable;
+    protected PTable pIndexTable;
+    protected Connection conn;
+
+    @BeforeClass
+    public static void setupClass() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            conn.setAutoCommit(true);
+            conn.createStatement().execute(DATA_TABLE_DDL);
+            conn.createStatement().execute(INDEX_TABLE_DDL);
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Before
+    public void setup() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        conn = DriverManager.getConnection(getUrl(), props);
+        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+        pDataTable = pconn.getTable(new PTableKey(pconn.getTenantId(), DATA_TABLE_FULL_NAME));
+        pIndexTable = pconn.getTable(new PTableKey(pconn.getTenantId(), INDEX_TABLE_FULL_NAME));
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (conn != null) {
+            conn.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java
new file mode 100644
index 0000000..c6bb739
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+
+import org.apache.phoenix.mapreduce.util.IndexColumnNames;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IndexScrutinyTableOutputTest extends BaseIndexTest {
+
+    private static final long SCRUTINY_TIME_MILLIS = 1502908914193L;
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+        conn.createStatement().execute(IndexScrutinyTableOutput.OUTPUT_TABLE_DDL);
+        conn.createStatement().execute(IndexScrutinyTableOutput.OUTPUT_METADATA_DDL);
+    }
+
+    @Test
+    public void testConstructMetadataParamQuery() {
+        String metadataParamQuery =
+                IndexScrutinyTableOutput
+                        .constructMetadataParamQuery(Arrays.asList("INVALID_ROWS_QUERY_ALL"));
+        assertEquals(
+            "SELECT \"INVALID_ROWS_QUERY_ALL\" FROM PHOENIX_INDEX_SCRUTINY_METADATA WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\") IN ((?,?,?))",
+            metadataParamQuery);
+    }
+
+    @Test
+    public void testGetSqlQueryAllInvalidRows() throws SQLException {
+        SourceTargetColumnNames columnNames =
+                new SourceTargetColumnNames.DataSourceColNames(pDataTable, pIndexTable);
+        String sqlStr =
+                IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, columnNames,
+                    SCRUTINY_TIME_MILLIS);
+        assertEquals("SELECT \"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\",\"SOURCE_ROW_PK_HASH\",\"SOURCE_TS\",\"TARGET_TS\",\"HAS_TARGET_ROW\",\"ID\",\"PK_PART2\",\"NAME\",\"ZIP\",\":ID\",\":PK_PART2\",\"0:NAME\",\"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193))",
+            sqlStr);
+    }
+
+    @Test
+    public void testGetSqlQueryMissingTargetRows() throws SQLException {
+        SourceTargetColumnNames columnNames =
+                new SourceTargetColumnNames.DataSourceColNames(pDataTable, pIndexTable);
+        String query =
+                IndexScrutinyTableOutput.getSqlQueryMissingTargetRows(conn, columnNames,
+                    SCRUTINY_TIME_MILLIS);
+        assertEquals("SELECT \"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\",\"SOURCE_ROW_PK_HASH\",\"SOURCE_TS\",\"TARGET_TS\",\"HAS_TARGET_ROW\",\"ID\",\"PK_PART2\",\"NAME\",\"ZIP\",\":ID\",\":PK_PART2\",\"0:NAME\",\"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193,false))",
+            query);
+    }
+
+    @Test
+    public void testGetSqlQueryBadCoveredColVal() throws SQLException {
+        SourceTargetColumnNames columnNames =
+                new SourceTargetColumnNames.DataSourceColNames(pDataTable, pIndexTable);
+        String query =
+                IndexScrutinyTableOutput.getSqlQueryBadCoveredColVal(conn, columnNames,
+                    SCRUTINY_TIME_MILLIS);
+        assertEquals("SELECT \"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\",\"SOURCE_ROW_PK_HASH\",\"SOURCE_TS\",\"TARGET_TS\",\"HAS_TARGET_ROW\",\"ID\",\"PK_PART2\",\"NAME\",\"ZIP\",\":ID\",\":PK_PART2\",\"0:NAME\",\"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193,true))",
+            query);
+    }
+
+    @Test
+    public void testGetOutputTableUpsert() throws Exception {
+        IndexColumnNames columnNames = new IndexColumnNames(pDataTable, pIndexTable);
+        String outputTableUpsert =
+                IndexScrutinyTableOutput.constructOutputTableUpsert(
+                    columnNames.getDynamicDataCols(), columnNames.getDynamicIndexCols(), conn);
+        conn.prepareStatement(outputTableUpsert); // shouldn't throw
+        assertEquals("UPSERT  INTO PHOENIX_INDEX_SCRUTINY (\"SOURCE_TABLE\", \"TARGET_TABLE\", \"SCRUTINY_EXECUTE_TIME\", \"SOURCE_ROW_PK_HASH\", \"SOURCE_TS\", \"TARGET_TS\", \"HAS_TARGET_ROW\", \"ID\" INTEGER, \"PK_PART2\" TINYINT, \"NAME\" VARCHAR, \"ZIP\" BIGINT, \":ID\" INTEGER, \":PK_PART2\" TINYINT, \"0:NAME\" VARCHAR, \"0:ZIP\" BIGINT) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+            outputTableUpsert);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/IndexColumnNamesTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/IndexColumnNamesTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/IndexColumnNamesTest.java
new file mode 100644
index 0000000..48c688f
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/IndexColumnNamesTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.BaseIndexTest;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.Test;
+
+public class IndexColumnNamesTest extends BaseIndexTest {
+
+    private static final String DYNAMIC_COL_DDL =
+            "CREATE TABLE IF NOT EXISTS PRECISION_NAME_TEST\n" + "(\n"
+                    + "    CHAR_TEST CHAR(15) NOT NULL primary key,\n"
+                    + "    VARCHAR_TEST VARCHAR(1),\n" + "    DECIMAL_TEST DECIMAL(10,2),\n"
+                    + "    BINARY_TEST BINARY(11),\n"
+                    + "    VARCHAR_UNSPEC VARCHAR,\n"
+                    + "    DEC_UNSPEC DECIMAL\n" + ")";
+
+    private static final String DYNAMIC_COL_IDX_DDL =
+            "CREATE INDEX PRECISION_NAME_IDX_TEST ON PRECISION_NAME_TEST(VARCHAR_TEST) INCLUDE (CHAR_TEST,DECIMAL_TEST,BINARY_TEST,VARCHAR_UNSPEC,DEC_UNSPEC)";
+
+    @Test
+    public void testGetColumnNames() {
+        IndexColumnNames indexColumnNames = new IndexColumnNames(pDataTable, pIndexTable);
+        assertEquals("[ID, PK_PART2, 0.NAME, 0.ZIP]", indexColumnNames.getDataColNames().toString());
+        assertEquals("[:ID, :PK_PART2, 0:NAME, 0:ZIP]", indexColumnNames.getIndexColNames().toString()); //index column names, leading with the data table pk
+        assertEquals("[:ID, :PK_PART2, 0:NAME]", indexColumnNames.getIndexPkColNames().toString());
+        assertEquals("[ID, PK_PART2]", indexColumnNames.getDataPkColNames().toString());
+        assertEquals("[0.NAME, 0.ZIP]", indexColumnNames.getDataNonPkColNames().toString());
+
+        assertEquals("[\"ID\" INTEGER, \"PK_PART2\" TINYINT, \"NAME\" VARCHAR, \"ZIP\" BIGINT]", indexColumnNames.getDynamicDataCols().toString());
+        assertEquals("[\":ID\" INTEGER, \":PK_PART2\" TINYINT, \"0:NAME\" VARCHAR, \"0:ZIP\" BIGINT]", indexColumnNames.getDynamicIndexCols().toString());
+        assertEquals("UPSERT /*+ NO_INDEX */  INTO TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL (\"ID\" INTEGER, \"PK_PART2\" TINYINT, \"NAME\" VARCHAR, \"ZIP\" BIGINT) VALUES (?, ?, ?, ?)", QueryUtil.constructUpsertStatement(DATA_TABLE_FULL_NAME, indexColumnNames.getDynamicDataCols(), Hint.NO_INDEX));
+    }
+
+    /**
+     * Tests that col types with a precision are outputted correctly in the dynamic columns
+     * @throws SQLException
+     */
+    @Test
+    public void testGetDynamicColPrecision() throws SQLException {
+        conn.createStatement().execute(DYNAMIC_COL_DDL);
+        conn.createStatement().execute(DYNAMIC_COL_IDX_DDL);
+        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+        pDataTable = pconn.getTable(new PTableKey(pconn.getTenantId(), "PRECISION_NAME_TEST"));
+        pIndexTable = pconn.getTable(new PTableKey(pconn.getTenantId(), "PRECISION_NAME_IDX_TEST"));
+        IndexColumnNames indexColumnNames = new IndexColumnNames(pDataTable, pIndexTable);
+        assertEquals("[\"CHAR_TEST\" CHAR(15), \"VARCHAR_TEST\" VARCHAR(1), \"DECIMAL_TEST\" DECIMAL(10,2), \"BINARY_TEST\" BINARY(11), \"VARCHAR_UNSPEC\" VARCHAR, \"DEC_UNSPEC\" DECIMAL]", indexColumnNames.getDynamicDataCols().toString());
+        assertEquals("[\":CHAR_TEST\" CHAR(15), \"0:VARCHAR_TEST\" VARCHAR(1), \"0:DECIMAL_TEST\" DECIMAL(10,2), \"0:BINARY_TEST\" BINARY(11), \"0:VARCHAR_UNSPEC\" VARCHAR, \"0:DEC_UNSPEC\" DECIMAL]",
+                indexColumnNames.getDynamicIndexCols().toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
index 45f536d..2d094f6 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -27,9 +27,11 @@ import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.parse.HintNode.Hint;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 public class QueryUtilTest {
 
@@ -94,7 +96,21 @@ public class QueryUtilTest {
                 "SELECT \"ID\",\"NAME\" FROM \"a\".\"mytab\"",
                 QueryUtil.constructSelectStatement(fullTableName, ImmutableList.of(ID_COLUMN,NAME_COLUMN),null));
     }
-    
+
+    @Test
+    public void testConstructSelectWithHint() {
+        assertEquals(
+            "SELECT /*+ NO_INDEX */ \"col1\",\"col2\" FROM MYTAB WHERE (\"col2\"=? and \"col3\" is null)",
+            QueryUtil.constructSelectStatement("MYTAB", Lists.newArrayList("col1", "col2"),
+                "\"col2\"=? and \"col3\" is null", Hint.NO_INDEX, true));
+    }
+
+    @Test
+    public void testConstructParameterizedInClause() {
+        assertEquals("((?,?,?),(?,?,?))", QueryUtil.constructParameterizedInClause(3, 2));
+        assertEquals("((?))", QueryUtil.constructParameterizedInClause(1, 1));
+    }
+
     /**
      * Test that we create connection strings from the HBase Configuration that match the
      * expected syntax. Expected to log exceptions as it uses ZK host names that don't exist


Mime
View raw message