phoenix-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] gjacoby126 commented on a change in pull request #441: PHOENIX-5089 Add tenantId parameter to IndexScrunityTool
Date Sat, 23 Feb 2019 00:05:49 GMT
gjacoby126 commented on a change in pull request #441: PHOENIX-5089 Add tenantId parameter to IndexScrunityTool
URL: https://github.com/apache/phoenix/pull/441#discussion_r259547070
 
 

 ##########
 File path: phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
 ##########
 @@ -81,681 +86,813 @@
  * Tests for the {@link IndexScrutinyTool}
  */
 @Category(NeedsOwnMiniClusterTest.class)
-@RunWith(Parameterized.class)
-public class IndexScrutinyToolIT extends BaseTest {
+@RunWith(Enclosed.class)
+public class IndexScrutinyToolIT {
 
-    private String dataTableDdl;
-    private String indexTableDdl;
+    abstract public static class SharedIndexToolIT extends BaseTest {
+        protected String outputDir;
 
-    private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)";
 
-    private static final String INDEX_UPSERT_SQL =
-        "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)";
+        @BeforeClass
+        public static void doSetup() throws Exception {
+            Map<String, String> serverProps = Maps.newHashMap();
+            //disable major compactions
+            serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0");
+            Map<String, String> clientProps = Maps.newHashMap();
+            setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+        }
 
-    private static final String DELETE_SQL = "DELETE FROM %s ";
+        protected List<Job> runScrutiny(String[] cmdArgs) throws Exception {
+            IndexScrutinyTool scrutiny = new IndexScrutinyTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            scrutiny.setConf(conf);
+            int status = scrutiny.run(cmdArgs);
+            assertEquals(0, status);
+            for (Job job : scrutiny.getJobs()) {
+                assertTrue(job.waitForCompletion(true));
+            }
+            return scrutiny.getJobs();
+        }
 
-    private String schemaName;
-    private String dataTableName;
-    private String dataTableFullName;
-    private String indexTableName;
-    private String indexTableFullName;
-    private String outputDir;
+        protected String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
+                SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat,
+                Long maxOutputRows, String tenantId, Long scrutinyTs) {
+            final List<String> args = Lists.newArrayList();
+            if (schemaName != null) {
+                args.add("-s");
+                args.add(schemaName);
+            }
+            args.add("-dt");
+            args.add(dataTable);
+            args.add("-it");
+            args.add(indxTable);
+
+            // TODO test snapshot reads
+            // if(useSnapshot) {
+            // args.add("-snap");
+            // }
+
+            if (OutputFormat.FILE.equals(outputFormat)) {
+                args.add("-op");
+                outputDir = "/tmp/" + UUID.randomUUID().toString();
+                args.add(outputDir);
+            }
 
-    private Connection conn;
+            args.add("-t");
+            args.add(String.valueOf(scrutinyTs));
+            args.add("-run-foreground");
+            if (batchSize != null) {
+                args.add("-b");
+                args.add(String.valueOf(batchSize));
+            }
 
-    private PreparedStatement dataTableUpsertStmt;
+            // default to using data table as the source table
+            args.add("-src");
+            if (sourceTable == null) {
+                args.add(SourceTable.DATA_TABLE_SOURCE.name());
+            } else {
+                args.add(sourceTable.name());
+            }
+            if (outputInvalidRows) {
+                args.add("-o");
+            }
+            if (outputFormat != null) {
+                args.add("-of");
+                args.add(outputFormat.name());
+            }
+            if (maxOutputRows != null) {
+                args.add("-om");
+                args.add(maxOutputRows.toString());
+            }
+            if (tenantId != null) {
+                args.add("-tenant");
+                args.add(tenantId);
+            }
+            return args.toArray(new String[0]);
+        }
 
-    private PreparedStatement indexTableUpsertStmt;
+        protected long getCounterValue(Counters counters, Enum<PhoenixScrutinyJobCounters> counter) {
+            return counters.findCounter(counter).getValue();
+        }
+    }
 
-    private long testTime;
-    private Properties props;
+    @RunWith(Parameterized.class)
+    public static class IndexScrutinyToolNonTenantIT extends SharedIndexToolIT {
 
-    @Parameterized.Parameters
-    public static Collection<Object[]> data() {
-        return Arrays.asList(new Object[][] {
-            { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)", "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" },
-            { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" },
-            { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }
-            });
-    }
+        private String dataTableDdl;
+        private String indexTableDdl;
 
-    public IndexScrutinyToolIT(String dataTableDdl, String indexTableDdl) {
-        this.dataTableDdl = dataTableDdl;
-        this.indexTableDdl = indexTableDdl;
-    }
+        private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)";
 
-    @BeforeClass
-    public static void doSetup() throws Exception {
-        Map<String, String> serverProps = Maps.newHashMap();
-        //disable major compactions
-        serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0");
-        Map<String, String> clientProps = Maps.newHashMap();
-        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
-    }
+        private static final String INDEX_UPSERT_SQL =
+            "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)";
 
-    /**
-     * Create the test data and index tables
-     */
-    @Before
-    public void setup() throws SQLException {
-        generateUniqueTableNames();
-        createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName));
-        createTestTable(getUrl(),
-            String.format(indexTableDdl, indexTableName, dataTableFullName));
-        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        conn = DriverManager.getConnection(getUrl(), props);
-        String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
-        dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert);
-        String indexTableUpsert = String.format(INDEX_UPSERT_SQL, indexTableFullName);
-        indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert);
-        conn.setAutoCommit(false);
-        testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000;
+        private static final String DELETE_SQL = "DELETE FROM %s ";
 
-    }
+        private String schemaName;
+        private String dataTableName;
+        private String dataTableFullName;
+        private String indexTableName;
+        private String indexTableFullName;
 
-    @After
-    public void teardown() throws SQLException {
-        if (conn != null) {
-            conn.close();
+        private Connection conn;
+
+        private PreparedStatement dataTableUpsertStmt;
+
+        private PreparedStatement indexTableUpsertStmt;
+
+        private long testTime;
+        private Properties props;
+
+        @Parameterized.Parameters
+        public static Collection<Object[]> data() {
+            return Arrays.asList(new Object[][] {
+                { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)", "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" },
+                { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" },
+                { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }
+                });
         }
-    }
 
-    /**
-     * Tests a data table that is correctly indexed. Scrutiny should report all rows as valid.
-     */
-    @Test
-    public void testValidIndex() throws Exception {
-        // insert two rows
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
-        conn.commit();
-
-        int numDataRows = countRows(dataTableFullName);
-        int numIndexRows = countRows(indexTableFullName);
-
-        // scrutiny should report everything as ok
-        List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
-
-        // make sure row counts weren't modified by scrutiny
-        assertEquals(numDataRows, countRows(dataTableFullName));
-        assertEquals(numIndexRows, countRows(indexTableFullName));
-    }
 
-    /**
-     * Tests running a scrutiny while updates and deletes are happening.
-     * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
-     */
-    @Test
-    @Ignore("PHOENIX-4378 Unable to set KEEP_DELETED_CELLS to true on RS scanner")
-    public void testScrutinyWhileTakingWrites() throws Exception {
-        int id = 0;
-        while (id < 1000) {
-            int index = 1;
-            dataTableUpsertStmt.setInt(index++, id);
-            dataTableUpsertStmt.setString(index++, "name-" + id);
-            dataTableUpsertStmt.setInt(index++, id);
-            dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime));
-            dataTableUpsertStmt.executeUpdate();
-            id++;
-        }
-        conn.commit();
-
-        //CURRENT_SCN for scrutiny
-        long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis();
-
-        // launch background upserts and deletes
-        final Random random = new Random(0);
-        Runnable backgroundUpserts = new Runnable() {
-            @Override
-            public void run() {
-                int idToUpsert = random.nextInt(1000);
-                try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-                    PreparedStatement dataPS =
-                            conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName));
-                    upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000);
-                    conn.commit();
-                } catch (SQLException e) {
-                    e.printStackTrace();
-                }
+        public IndexScrutinyToolNonTenantIT(String dataTableDdl, String indexTableDdl) {
+            this.dataTableDdl = dataTableDdl;
+            this.indexTableDdl = indexTableDdl;
+        }
+
+        /**
+         * Create the test data and index tables
+         */
+        @Before
+        public void setup() throws SQLException {
+            generateUniqueTableNames();
+            createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName));
+            createTestTable(getUrl(),
+                String.format(indexTableDdl, indexTableName, dataTableFullName));
+            props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            conn = DriverManager.getConnection(getUrl(), props);
+            String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
+            dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert);
+            String indexTableUpsert = String.format(INDEX_UPSERT_SQL, indexTableFullName);
+            indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert);
+            conn.setAutoCommit(false);
+            testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000;
+
+        }
+
+        @After
+        public void teardown() throws SQLException {
+            if (conn != null) {
+                conn.close();
             }
-        };
-        Runnable backgroundDeletes = new Runnable() {
-            @Override
-            public void run() {
-                int idToDelete = random.nextInt(1000);
-                try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-                    String deleteSql =
-                            String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"="
-                                    + idToDelete;
-                    conn.createStatement().executeUpdate(deleteSql);
-                    conn.commit();
-                } catch (SQLException e) {
-                    e.printStackTrace();
-                }
+        }
+
+        /**
+         * Tests a data table that is correctly indexed. Scrutiny should report all rows as valid.
+         */
+        @Test
+        public void testValidIndex() throws Exception {
+            // insert two rows
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+            conn.commit();
+
+            int numDataRows = countRows(dataTableFullName);
+            int numIndexRows = countRows(indexTableFullName);
+
+            // scrutiny should report everything as ok
+            List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+
+            // make sure row counts weren't modified by scrutiny
+            assertEquals(numDataRows, countRows(dataTableFullName));
+            assertEquals(numIndexRows, countRows(indexTableFullName));
+        }
+
+        /**
+         * Tests running a scrutiny while updates and deletes are happening.
+         * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue.
+         */
+        @Test
+        @Ignore("PHOENIX-4378 Unable to set KEEP_DELETED_CELLS to true on RS scanner")
+        public void testScrutinyWhileTakingWrites() throws Exception {
+            int id = 0;
+            while (id < 1000) {
+                int index = 1;
+                dataTableUpsertStmt.setInt(index++, id);
+                dataTableUpsertStmt.setString(index++, "name-" + id);
+                dataTableUpsertStmt.setInt(index++, id);
+                dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime));
+                dataTableUpsertStmt.executeUpdate();
+                id++;
             }
-        };
-        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
-        scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200,
-            TimeUnit.MILLISECONDS);
-        scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200,
-            TimeUnit.MILLISECONDS);
-
-        // scrutiny should report everything as ok
-        List<Job> completedJobs =
-                runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName,
-                    scrutinyTS);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
-        scheduledThreadPool.shutdown();
-        scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS);
-    }
+            conn.commit();
+
+            //CURRENT_SCN for scrutiny
+            long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis();
+
+            // launch background upserts and deletes
+            final Random random = new Random(0);
+            Runnable backgroundUpserts = new Runnable() {
+                @Override
+                public void run() {
+                    int idToUpsert = random.nextInt(1000);
+                    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+                        PreparedStatement dataPS =
+                                conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName));
+                        upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000);
+                        conn.commit();
+                    } catch (SQLException e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            Runnable backgroundDeletes = new Runnable() {
+                @Override
+                public void run() {
+                    int idToDelete = random.nextInt(1000);
+                    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+                        String deleteSql =
+                                String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"="
+                                        + idToDelete;
+                        conn.createStatement().executeUpdate(deleteSql);
+                        conn.commit();
+                    } catch (SQLException e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
+            scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200,
+                TimeUnit.MILLISECONDS);
+            scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200,
+                TimeUnit.MILLISECONDS);
+
+            // scrutiny should report everything as ok
+            List<Job> completedJobs =
+                    runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName,
+                        scrutinyTS);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+            scheduledThreadPool.shutdown();
+            scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS);
+        }
 
-    /**
-     * Tests an index with the same # of rows as the data table, but one of the index rows is
-     * incorrect Scrutiny should report the invalid rows.
-     */
-    @Test
-    public void testEqualRowCountIndexIncorrect() throws Exception {
-        // insert one valid row
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
-        conn.commit();
-
-        // disable the index and insert another row which is not indexed
-        disableIndex();
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
-        conn.commit();
-
-        // insert a bad row into the index
-        upsertIndexRow("badName", 2, 9999);
-        conn.commit();
-
-        // scrutiny should report the bad row
-        List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
-    }
+        /**
+         * Tests an index with the same # of rows as the data table, but one of the index rows is
+         * incorrect Scrutiny should report the invalid rows.
+         */
+        @Test
+        public void testEqualRowCountIndexIncorrect() throws Exception {
+            // insert one valid row
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+            conn.commit();
+
+            // disable the index and insert another row which is not indexed
+            disableIndex();
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+            conn.commit();
+
+            // insert a bad row into the index
+            upsertIndexRow("badName", 2, 9999);
+            conn.commit();
+
+            // scrutiny should report the bad row
+            List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+        }
 
-    /**
-     * Tests an index where the index pk is correct (indexed col values are indexed correctly), but
-     * a covered index value is incorrect. Scrutiny should report the invalid row
-     */
-    @Test
-    public void testCoveredValueIncorrect() throws Exception {
-        // insert one valid row
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
-        conn.commit();
-
-        // disable index and insert another data row
-        disableIndex();
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
-        conn.commit();
-
-        // insert a bad index row for the above data row
-        upsertIndexRow("name-2", 2, 9999);
-        conn.commit();
-
-        // scrutiny should report the bad row
-        List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
-        assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT));
-    }
+        /**
+         * Tests an index where the index pk is correct (indexed col values are indexed correctly), but
+         * a covered index value is incorrect. Scrutiny should report the invalid row
+         */
+        @Test
+        public void testCoveredValueIncorrect() throws Exception {
+            // insert one valid row
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+            conn.commit();
+
+            // disable index and insert another data row
+            disableIndex();
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+            conn.commit();
+
+            // insert a bad index row for the above data row
+            upsertIndexRow("name-2", 2, 9999);
+            conn.commit();
+
+            // scrutiny should report the bad row
+            List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+            assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT));
+        }
 
-    /**
-     * Test batching of row comparisons Inserts 1001 rows, with some random bad rows, and runs
-     * scrutiny with batchsize of 10,
-     */
-    @Test
-    public void testBatching() throws Exception {
-        // insert 1001 data and index rows
-        int numTestRows = 1001;
-        for (int i = 0; i < numTestRows; i++) {
-            upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000);
-        }
-        conn.commit();
-
-        disableIndex();
-
-        // randomly delete some rows from the index
-        Random random = new Random();
-        for (int i = 0; i < 100; i++) {
-            int idToDelete = random.nextInt(numTestRows);
-            deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete);
-        }
-        conn.commit();
-        int numRows = countRows(indexTableFullName);
-        int numDeleted = numTestRows - numRows;
-
-        // run scrutiny with batch size of 10
-        List<Job> completedJobs =
-                runScrutiny(schemaName, dataTableName, indexTableName, 10L);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(numTestRows - numDeleted, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(numDeleted, getCounterValue(counters, INVALID_ROW_COUNT));
-        assertEquals(numTestRows / 10 + numTestRows % 10,
-            getCounterValue(counters, BATCHES_PROCESSED_COUNT));
-    }
+        /**
+         * Test batching of row comparisons Inserts 1001 rows, with some random bad rows, and runs
+         * scrutiny with batchsize of 10,
+         */
+        @Test
+        public void testBatching() throws Exception {
+            // insert 1001 data and index rows
+            int numTestRows = 1001;
+            for (int i = 0; i < numTestRows; i++) {
+                upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000);
+            }
+            conn.commit();
 
-    /**
-     * Tests when there are more data table rows than index table rows Scrutiny should report the
-     * number of incorrect rows
-     */
-    @Test
-    public void testMoreDataRows() throws Exception {
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
-        conn.commit();
-        disableIndex();
-        // these rows won't have a corresponding index row
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95124);
-        upsertRow(dataTableUpsertStmt, 3, "name-3", 95125);
-        conn.commit();
-
-        List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
-    }
+            disableIndex();
 
-    /**
-     * Tests when there are more index table rows than data table rows Scrutiny should report the
-     * number of incorrect rows when run with the index as the source table
-     */
-    @Test
-    public void testMoreIndexRows() throws Exception {
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
-        conn.commit();
-        disableIndex();
-        // these index rows won't have a corresponding data row
-        upsertIndexRow("name-2", 2, 95124);
-        upsertIndexRow("name-3", 3, 95125);
-        conn.commit();
-
-        List<Job> completedJobs =
-                runScrutiny(schemaName, dataTableName, indexTableName, 10L,
-                    SourceTable.INDEX_TABLE_SOURCE);
-        Job job = completedJobs.get(0);
-        assertTrue(job.isSuccessful());
-        Counters counters = job.getCounters();
-        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
-        assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
-    }
+            // randomly delete some rows from the index
+            Random random = new Random();
+            for (int i = 0; i < 100; i++) {
+                int idToDelete = random.nextInt(numTestRows);
+                deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete);
+            }
+            conn.commit();
+            int numRows = countRows(indexTableFullName);
+            int numDeleted = numTestRows - numRows;
+
+            // run scrutiny with batch size of 10
+            List<Job> completedJobs =
+                    runScrutiny(schemaName, dataTableName, indexTableName, 10L);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(numTestRows - numDeleted, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(numDeleted, getCounterValue(counters, INVALID_ROW_COUNT));
+            assertEquals(numTestRows / 10 + numTestRows % 10,
+                getCounterValue(counters, BATCHES_PROCESSED_COUNT));
+        }
 
-    /**
-     * Tests running with both the index and data tables as the source table If we have an
-     * incorrectly indexed row, it should be reported in each direction
-     */
-    @Test
-    public void testBothDataAndIndexAsSource() throws Exception {
-        // insert one valid row
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
-        conn.commit();
-
-        // disable the index and insert another row which is not indexed
-        disableIndex();
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
-        conn.commit();
-
-        // insert a bad row into the index
-        upsertIndexRow("badName", 2, 9999);
-        conn.commit();
-
-        List<Job> completedJobs =
-                runScrutiny(schemaName, dataTableName, indexTableName, 10L,
-                    SourceTable.BOTH);
-        assertEquals(2, completedJobs.size());
-        for (Job job : completedJobs) {
+        /**
+         * Tests when there are more data table rows than index table rows Scrutiny should report the
+         * number of incorrect rows
+         */
+        @Test
+        public void testMoreDataRows() throws Exception {
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
+            conn.commit();
+            disableIndex();
+            // these rows won't have a corresponding index row
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95124);
+            upsertRow(dataTableUpsertStmt, 3, "name-3", 95125);
+            conn.commit();
+
+            List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
+            Job job = completedJobs.get(0);
             assertTrue(job.isSuccessful());
             Counters counters = job.getCounters();
             assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
-            assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+            assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
         }
-    }
 
-    /**
-     * Tests that with the output to file option set, the scrutiny tool outputs invalid rows to file
-     */
-    @Test
-    public void testOutputInvalidRowsToFile() throws Exception {
-        insertOneValid_OneBadVal_OneMissingTarget();
-
-        String[] argValues =
-                getArgValues(schemaName, dataTableName, indexTableName, 10L,
-                    SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null);
-        runScrutiny(argValues);
-
-        // check the output files
-        Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName);
-        DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem();
-        List<Path> paths = Lists.newArrayList();
-        Path firstPart = null;
-        for (FileStatus outputFile : fs.listStatus(outputPath)) {
-            if (outputFile.getPath().getName().startsWith("part")) {
-                if (firstPart == null) {
-                    firstPart = outputFile.getPath();
-                } else {
-                    paths.add(outputFile.getPath());
+        /**
+         * Tests when there are more index table rows than data table rows Scrutiny should report the
+         * number of incorrect rows when run with the index as the source table
+         */
+        @Test
+        public void testMoreIndexRows() throws Exception {
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
+            conn.commit();
+            disableIndex();
+            // these index rows won't have a corresponding data row
+            upsertIndexRow("name-2", 2, 95124);
+            upsertIndexRow("name-3", 3, 95125);
+            conn.commit();
+
+            List<Job> completedJobs =
+                    runScrutiny(schemaName, dataTableName, indexTableName, 10L,
+                        SourceTable.INDEX_TABLE_SOURCE);
+            Job job = completedJobs.get(0);
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
+        }
+
+        /**
+         * Tests running with both the index and data tables as the source table If we have an
+         * incorrectly indexed row, it should be reported in each direction
+         */
+        @Test
+        public void testBothDataAndIndexAsSource() throws Exception {
+            // insert one valid row
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+            conn.commit();
+
+            // disable the index and insert another row which is not indexed
+            disableIndex();
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+            conn.commit();
+
+            // insert a bad row into the index
+            upsertIndexRow("badName", 2, 9999);
+            conn.commit();
+
+            List<Job> completedJobs =
+                    runScrutiny(schemaName, dataTableName, indexTableName, 10L,
+                        SourceTable.BOTH);
+            assertEquals(2, completedJobs.size());
+            for (Job job : completedJobs) {
+                assertTrue(job.isSuccessful());
+                Counters counters = job.getCounters();
+                assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+                assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+            }
+        }
+
+        /**
+         * Tests that with the output to file option set, the scrutiny tool outputs invalid rows to file
+         */
+        @Test
+        public void testOutputInvalidRowsToFile() throws Exception {
+            insertOneValid_OneBadVal_OneMissingTarget();
+
+            String[] argValues =
+                    getArgValues(schemaName, dataTableName, indexTableName, 10L,
+                        SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null);
+            runScrutiny(argValues);
+
+            // check the output files
+            Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName);
+            DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem();
+            List<Path> paths = Lists.newArrayList();
+            Path firstPart = null;
+            for (FileStatus outputFile : fs.listStatus(outputPath)) {
+                if (outputFile.getPath().getName().startsWith("part")) {
+                    if (firstPart == null) {
+                        firstPart = outputFile.getPath();
+                    } else {
+                        paths.add(outputFile.getPath());
+                    }
                 }
             }
+            if (dataTableDdl.contains("SALT_BUCKETS")) {
+                fs.concat(firstPart, paths.toArray(new Path[0]));
+            }
+            Path outputFilePath = firstPart;
+            assertTrue(fs.exists(outputFilePath));
+            FSDataInputStream fsDataInputStream = fs.open(outputFilePath);
+            BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream));
+            TreeSet<String> lines = Sets.newTreeSet();
+            try {
+                String line = null;
+                while ((line = reader.readLine()) != null) {
+                    lines.add(line);
+                }
+            } finally {
+                IOUtils.closeQuietly(reader);
+                IOUtils.closeQuietly(fsDataInputStream);
+            }
+            Iterator<String> lineIterator = lines.iterator();
+            assertEquals(
+                "[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime)
+                    .toString() + ", 9999]", lineIterator.next());
+            assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found",
+                lineIterator.next());
+
+        }
+
+        /**
+         * Tests writing of results to the output table
+         */
+        @Test
+        public void testOutputInvalidRowsToTable() throws Exception {
+            insertOneValid_OneBadVal_OneMissingTarget();
+            String[] argValues =
+                    getArgValues(schemaName, dataTableName, indexTableName, 10L,
+                        SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null);
+            List<Job> completedJobs = runScrutiny(argValues);
+
+            // check that the output table contains the invalid rows
+            long scrutinyTimeMillis =
+                    PhoenixConfigurationUtil
+                            .getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
+            String invalidRowsQuery =
+                    IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, getColNames(),
+                        scrutinyTimeMillis);
+            ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc");
+            assertTrue(rs.next());
+            assertEquals(dataTableFullName,
+                rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
+            assertEquals(indexTableFullName,
+                rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
+            assertTrue(rs.getBoolean("HAS_TARGET_ROW"));
+            assertEquals(2, rs.getInt("ID"));
+            assertEquals(2, rs.getInt(":ID"));
+            assertEquals(95123, rs.getInt("ZIP"));
+            assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip incorrect
+            assertTrue(rs.next());
+            assertEquals(dataTableFullName,
+                rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
+            assertEquals(indexTableFullName,
+                rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
+            assertFalse(rs.getBoolean("HAS_TARGET_ROW"));
+            assertEquals(3, rs.getInt("ID"));
+            assertEquals(null, rs.getObject(":ID")); // null for missing target row
+            assertFalse(rs.next());
+
+            // check that the job results were written correctly to the metadata table
+            assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery);
         }
-        if (dataTableDdl.contains("SALT_BUCKETS")) {
-            fs.concat(firstPart, paths.toArray(new Path[0]));
-        }
-        Path outputFilePath = firstPart;
-        assertTrue(fs.exists(outputFilePath));
-        FSDataInputStream fsDataInputStream = fs.open(outputFilePath);
-        BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream));
-        TreeSet<String> lines = Sets.newTreeSet();
-        try {
-            String line = null;
-            while ((line = reader.readLine()) != null) {
-                lines.add(line);
+
+        /**
+         * Tests that the config for max number of output rows is observed
+         */
+        @Test
+        public void testMaxOutputRows() throws Exception {
+            insertOneValid_OneBadVal_OneMissingTarget();
+            // set max to 1.  There are two bad rows, but only 1 should get written to output table
+            String[] argValues =
+                    getArgValues(schemaName, dataTableName, indexTableName, 10L,
+                        SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1));
+            List<Job> completedJobs = runScrutiny(argValues);
+            long scrutinyTimeMillis =
+                    PhoenixConfigurationUtil
+                            .getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
+            String invalidRowsQuery =
+                    IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, getColNames(),
+                        scrutinyTimeMillis);
+            ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery);
+            assertTrue(rs.next());
+            if (dataTableDdl.contains("SALT_BUCKETS")) {
+                assertTrue(rs.next());
+                assertFalse(rs.next());
+            } else {
+                assertFalse(rs.next());
             }
-        } finally {
-            IOUtils.closeQuietly(reader);
-            IOUtils.closeQuietly(fsDataInputStream);
         }
-        Iterator<String> lineIterator = lines.iterator();
-        assertEquals(
-            "[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime)
-                .toString() + ", 9999]", lineIterator.next());
-        assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found",
-            lineIterator.next());
 
-    }
+        private SourceTargetColumnNames getColNames() throws SQLException {
+            PTable pdataTable = PhoenixRuntime.getTable(conn, dataTableFullName);
+            PTable pindexTable = PhoenixRuntime.getTable(conn, indexTableFullName);
+            SourceTargetColumnNames columnNames =
+                    new SourceTargetColumnNames.DataSourceColNames(pdataTable, pindexTable, false);
+            return columnNames;
+        }
 
-    /**
-     * Tests writing of results to the output table
-     */
-    @Test
-    public void testOutputInvalidRowsToTable() throws Exception {
-        insertOneValid_OneBadVal_OneMissingTarget();
-        String[] argValues =
-                getArgValues(schemaName, dataTableName, indexTableName, 10L,
-                    SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null);
-        List<Job> completedJobs = runScrutiny(argValues);
-
-        // check that the output table contains the invalid rows
-        long scrutinyTimeMillis =
-                PhoenixConfigurationUtil
-                        .getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
-        String invalidRowsQuery =
-                IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, getColNames(),
-                    scrutinyTimeMillis);
-        ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc");
-        assertTrue(rs.next());
-        assertEquals(dataTableFullName,
-            rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
-        assertEquals(indexTableFullName,
-            rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
-        assertTrue(rs.getBoolean("HAS_TARGET_ROW"));
-        assertEquals(2, rs.getInt("ID"));
-        assertEquals(2, rs.getInt(":ID"));
-        assertEquals(95123, rs.getInt("ZIP"));
-        assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip incorrect
-        assertTrue(rs.next());
-        assertEquals(dataTableFullName,
-            rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
-        assertEquals(indexTableFullName,
-            rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
-        assertFalse(rs.getBoolean("HAS_TARGET_ROW"));
-        assertEquals(3, rs.getInt("ID"));
-        assertEquals(null, rs.getObject(":ID")); // null for missing target row
-        assertFalse(rs.next());
-
-        // check that the job results were written correctly to the metadata table
-        assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery);
-    }
+        // inserts one valid data/index row, one data row with a missing index row,
+        // and one data row with an index row that has a bad covered col val
+        private void insertOneValid_OneBadVal_OneMissingTarget() throws SQLException {
+            // insert one valid row
+            upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+            conn.commit();
+
+            // disable the index and insert another row which is not indexed
+            disableIndex();
+            upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+            upsertRow(dataTableUpsertStmt, 3, "name-3", 95123);
+            conn.commit();
+
+            // insert a bad index row for one of the above data rows
+            upsertIndexRow("name-2", 2, 9999);
+            conn.commit();
+        }
 
-    /**
-     * Tests that the config for max number of output rows is observed
-     */
-    @Test
-    public void testMaxOutputRows() throws Exception {
-        insertOneValid_OneBadVal_OneMissingTarget();
-        // set max to 1.  There are two bad rows, but only 1 should get written to output table
-        String[] argValues =
-                getArgValues(schemaName, dataTableName, indexTableName, 10L,
-                    SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1));
-        List<Job> completedJobs = runScrutiny(argValues);
-        long scrutinyTimeMillis =
-                PhoenixConfigurationUtil
-                        .getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
-        String invalidRowsQuery =
-                IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, getColNames(),
-                    scrutinyTimeMillis);
-        ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery);
-        assertTrue(rs.next());
-        if (dataTableDdl.contains("SALT_BUCKETS")) {
+        private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis,
+                String invalidRowsQuery) throws SQLException {
+            ResultSet rs;
+            ResultSet metadataRs =
+                    IndexScrutinyTableOutput.queryAllMetadata(conn, dataTableFullName,
+                        indexTableFullName, scrutinyTimeMillis);
+            assertTrue(metadataRs.next());
+            List<? extends Object> expected =
+                Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
+                    SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
+                    2L, 1L, 1L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
+                    "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
+            if (dataTableDdl.contains("SALT_BUCKETS")) {
+                expected = Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
+                    SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
+                    2L, 1L, 2L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
+                    "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
+            }
+
+            assertRsValues(metadataRs, expected);
+            String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
+            rs = conn.createStatement().executeQuery(missingTargetQuery);
             assertTrue(rs.next());
+            assertEquals(3, rs.getInt("ID"));
             assertFalse(rs.next());
-        } else {
+            String badCoveredColQuery = metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL");
+            rs = conn.createStatement().executeQuery(badCoveredColQuery);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt("ID"));
             assertFalse(rs.next());
         }
-    }
 
-    private SourceTargetColumnNames getColNames() throws SQLException {
-        PTable pdataTable = PhoenixRuntime.getTable(conn, dataTableFullName);
-        PTable pindexTable = PhoenixRuntime.getTable(conn, indexTableFullName);
-        SourceTargetColumnNames columnNames =
-                new SourceTargetColumnNames.DataSourceColNames(pdataTable, pindexTable);
-        return columnNames;
-    }
+        // assert the result set contains the expected values in the given order
+        private void assertRsValues(ResultSet rs, List<? extends Object> expected) throws SQLException {
+            for (int i = 0; i < expected.size(); i++) {
+                assertEquals(expected.get(i), rs.getObject(i + 1));
+            }
+        }
 
-    // inserts one valid data/index row, one data row with a missing index row,
-    // and one data row with an index row that has a bad covered col val
-    private void insertOneValid_OneBadVal_OneMissingTarget() throws SQLException {
-        // insert one valid row
-        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
-        conn.commit();
-
-        // disable the index and insert another row which is not indexed
-        disableIndex();
-        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
-        upsertRow(dataTableUpsertStmt, 3, "name-3", 95123);
-        conn.commit();
-
-        // insert a bad index row for one of the above data rows
-        upsertIndexRow("name-2", 2, 9999);
-        conn.commit();
-    }
+        private void generateUniqueTableNames() {
+            schemaName = generateUniqueName();
+            dataTableName = generateUniqueName();
+            dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+            indexTableName = generateUniqueName();
+            indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+        }
 
-    private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis,
-            String invalidRowsQuery) throws SQLException {
-        ResultSet rs;
-        ResultSet metadataRs =
-                IndexScrutinyTableOutput.queryAllMetadata(conn, dataTableFullName,
-                    indexTableFullName, scrutinyTimeMillis);
-        assertTrue(metadataRs.next());
-        List<? extends Object> expected =
-            Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
-                SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
-                2L, 1L, 1L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
-                "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
-        if (dataTableDdl.contains("SALT_BUCKETS")) {
-            expected = Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis,
-                SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L,
-                2L, 1L, 2L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
-                "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
-        }
-
-        assertRsValues(metadataRs, expected);
-        String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
-        rs = conn.createStatement().executeQuery(missingTargetQuery);
-        assertTrue(rs.next());
-        assertEquals(3, rs.getInt("ID"));
-        assertFalse(rs.next());
-        String badCoveredColQuery = metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL");
-        rs = conn.createStatement().executeQuery(badCoveredColQuery);
-        assertTrue(rs.next());
-        assertEquals(2, rs.getInt("ID"));
-        assertFalse(rs.next());
-    }
+        private int countRows(String tableFullName) throws SQLException {
+            ResultSet count = conn.createStatement().executeQuery("select count(*) from " + tableFullName);
+            count.next();
+            int numRows = count.getInt(1);
+            return numRows;
+        }
 
-    // assert the result set contains the expected values in the given order
-    private void assertRsValues(ResultSet rs, List<? extends Object> expected) throws SQLException {
-        for (int i = 0; i < expected.size(); i++) {
-            assertEquals(expected.get(i), rs.getObject(i + 1));
+        private void upsertIndexRow(String name, int id, int zip) throws SQLException {
+            indexTableUpsertStmt.setString(1, name);
+            indexTableUpsertStmt.setInt(2, id); // id
+            indexTableUpsertStmt.setInt(3, zip); // bad zip
+            indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime));
+            indexTableUpsertStmt.executeUpdate();
         }
-    }
 
-    private void generateUniqueTableNames() {
-        schemaName = generateUniqueName();
-        dataTableName = generateUniqueName();
-        dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
-        indexTableName = generateUniqueName();
-        indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
-    }
+        private void disableIndex() throws SQLException {
+            conn.createStatement().execute(
+                String.format("ALTER INDEX %s ON %S disable", indexTableName, dataTableFullName));
+            conn.commit();
+        }
 
-    private int countRows(String tableFullName) throws SQLException {
-        ResultSet count = conn.createStatement().executeQuery("select count(*) from " + tableFullName);
-        count.next();
-        int numRows = count.getInt(1);
-        return numRows;
-    }
+        private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
+                SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat,
+                Long maxOutputRows) {
+            return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable,
+                outputInvalidRows, outputFormat, maxOutputRows, null, Long.MAX_VALUE);
+        }
 
-    private void upsertIndexRow(String name, int id, int zip) throws SQLException {
-        indexTableUpsertStmt.setString(1, name);
-        indexTableUpsertStmt.setInt(2, id); // id
-        indexTableUpsertStmt.setInt(3, zip); // bad zip
-        indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime));
-        indexTableUpsertStmt.executeUpdate();
-    }
+        private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception {
+            return runScrutiny(getArgValues(schemaName, dataTableName, indexTableName, null,
+                    SourceTable.BOTH, false, null, null, null, scrutinyTS));
+        }
 
-    private void disableIndex() throws SQLException {
-        conn.createStatement().execute(
-            String.format("ALTER INDEX %s ON %S disable", indexTableName, dataTableFullName));
-        conn.commit();
-    }
+        private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName) throws Exception {
+            return runScrutiny(schemaName, dataTableName, indexTableName, null, null);
+        }
 
-    private long getCounterValue(Counters counters, Enum<PhoenixScrutinyJobCounters> counter) {
-        return counters.findCounter(counter).getValue();
-    }
+        private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
+                Long batchSize) throws Exception {
+            return runScrutiny(schemaName, dataTableName, indexTableName, batchSize, null);
+        }
 
-    private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
-            SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat,
-            Long maxOutputRows) {
-        return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable,
-            outputInvalidRows, outputFormat, maxOutputRows, Long.MAX_VALUE);
-    }
+        private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
+                Long batchSize, SourceTable sourceTable) throws Exception {
+            final String[] cmdArgs =
+                    getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable, false,
+                        null, null, null, Long.MAX_VALUE);
+            return runScrutiny(cmdArgs);
+        }
 
-    private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize,
-            SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat,
-            Long maxOutputRows, Long scrutinyTs) {
-        final List<String> args = Lists.newArrayList();
-        if (schemaName != null) {
-            args.add("-s");
-            args.add(schemaName);
-        }
-        args.add("-dt");
-        args.add(dataTable);
-        args.add("-it");
-        args.add(indxTable);
-
-        // TODO test snapshot reads
-        // if(useSnapshot) {
-        // args.add("-snap");
-        // }
-
-        if (OutputFormat.FILE.equals(outputFormat)) {
-            args.add("-op");
-            outputDir = "/tmp/" + UUID.randomUUID().toString();
-            args.add(outputDir);
-        }
-        args.add("-t");
-        args.add(String.valueOf(scrutinyTs));
-        args.add("-run-foreground");
-        if (batchSize != null) {
-            args.add("-b");
-            args.add(String.valueOf(batchSize));
-        }
-
-        // default to using data table as the source table
-        args.add("-src");
-        if (sourceTable == null) {
-            args.add(SourceTable.DATA_TABLE_SOURCE.name());
-        } else {
-            args.add(sourceTable.name());
-        }
-        if (outputInvalidRows) {
-            args.add("-o");
-        }
-        if (outputFormat != null) {
-            args.add("-of");
-            args.add(outputFormat.name());
-        }
-        if (maxOutputRows != null) {
-            args.add("-om");
-            args.add(maxOutputRows.toString());
-        }
-        return args.toArray(new String[0]);
-    }
+        private void upsertRow(PreparedStatement stmt, int id, String name, int zip)
+                throws SQLException {
+            int index = 1;
+            // insert row
+            stmt.setInt(index++, id);
+            stmt.setString(index++, name);
+            stmt.setInt(index++, zip);
+            stmt.setTimestamp(index++, new Timestamp(testTime));
+            stmt.executeUpdate();
+        }
 
-    private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception {
-        return runScrutiny(getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH, false, null, null, scrutinyTS));
-    }
+        private int deleteRow(String fullTableName, String whereCondition) throws SQLException {
+            String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition;
+            PreparedStatement deleteStmt = conn.prepareStatement(deleteSql);
+            return deleteStmt.executeUpdate();
+        }
 
-    private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName) throws Exception {
-        return runScrutiny(schemaName, dataTableName, indexTableName, null, null);
     }
 
-    private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
-            Long batchSize) throws Exception {
-        return runScrutiny(schemaName, dataTableName, indexTableName, batchSize, null);
-    }
+    public static class IndexScrutinyToolTenantIT extends SharedIndexToolIT {
+        private static final String MULTI_TENANT_DATA_TABLE_NAME = "MULTI_TENANT_TABLE";
 
-    private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName,
-            Long batchSize, SourceTable sourceTable) throws Exception {
-        final String[] cmdArgs =
-                getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable, false,
-                    null, null, Long.MAX_VALUE);
-        return runScrutiny(cmdArgs);
-    }
+        /**
+         * Tests that the config for max number of output rows is observed
+         */
+        @Test
+        public void testTenantId() throws Exception {
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            Connection connGlobal = DriverManager.getConnection(getUrl(), props);
 
-    private List<Job> runScrutiny(String[] cmdArgs) throws Exception {
-        IndexScrutinyTool scrutiny = new IndexScrutinyTool();
-        Configuration conf = new Configuration(getUtility().getConfiguration());
-        scrutiny.setConf(conf);
-        int status = scrutiny.run(cmdArgs);
-        assertEquals(0, status);
-        for (Job job : scrutiny.getJobs()) {
-            assertTrue(job.waitForCompletion(true));
-        }
-        return scrutiny.getJobs();
-    }
+            String tenantId = generateUniqueName();
+            String tenantViewName = generateUniqueName();
+            String indexNameTenant = generateUniqueName();
+            String viewIndexTableName = "_IDX_" + MULTI_TENANT_DATA_TABLE_NAME;
 
-    private void upsertRow(PreparedStatement stmt, int id, String name, int zip)
-            throws SQLException {
-        int index = 1;
-        // insert row
-        stmt.setInt(index++, id);
-        stmt.setString(index++, name);
-        stmt.setInt(index++, zip);
-        stmt.setTimestamp(index++, new Timestamp(testTime));
-        stmt.executeUpdate();
-    }
 
-    private int deleteRow(String fullTableName, String whereCondition) throws SQLException {
-        String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition;
-        PreparedStatement deleteStmt = conn.prepareStatement(deleteSql);
-        return deleteStmt.executeUpdate();
-    }
+            props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+            Connection connTenant = DriverManager.getConnection(getUrl(), props);
 
 Review comment:
   Connections should be created either as try-with-resources or with a finally block closing them. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message