Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E4E37200CF3 for ; Wed, 30 Aug 2017 06:41:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E399B1682DA; Wed, 30 Aug 2017 04:41:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B58E41682D1 for ; Wed, 30 Aug 2017 06:41:12 +0200 (CEST) Received: (qmail 42091 invoked by uid 500); 30 Aug 2017 04:41:11 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 42082 invoked by uid 99); 30 Aug 2017 04:41:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Aug 2017 04:41:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B1206E0019; Wed, 30 Aug 2017 04:41:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: samarth@apache.org To: commits@phoenix.apache.org Message-Id: <66e13398b6fe45b8afef98a862959904@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: phoenix git commit: PHOENIX-4141 Fix flapping TableSnapshotReadsMapReduceIT Date: Wed, 30 Aug 2017 04:41:11 +0000 (UTC) archived-at: Wed, 30 Aug 2017 04:41:14 -0000 Repository: phoenix Updated Branches: refs/heads/master 74762df7a -> 378b56c4a PHOENIX-4141 Fix flapping TableSnapshotReadsMapReduceIT Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/378b56c4 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/378b56c4 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/378b56c4 Branch: refs/heads/master Commit: 378b56c4ad71f9e10887adec92618300285f6d2d Parents: 74762df Author: Samarth Jain Authored: Tue Aug 29 21:41:07 2017 -0700 Committer: Samarth Jain Committed: Tue Aug 29 21:41:07 2017 -0700 ---------------------------------------------------------------------- .../end2end/TableSnapshotReadsMapReduceIT.java | 402 ++++++++++--------- 1 file changed, 220 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/378b56c4/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java index 4cc2a20..591f028 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java @@ -18,11 +18,25 @@ package org.apache.phoenix.end2end; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; - import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.io.NullWritable; @@ -32,191 +46,215 @@ import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.index.PhoenixIndexDBWritable; import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; -import org.junit.*; +import org.apache.phoenix.util.EnvironmentEdge; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; -import java.io.IOException; -import java.sql.*; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.UUID; +public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT { + private final static String SNAPSHOT_NAME = "FOO"; + private static final String FIELD1 = "FIELD1"; + private static final String FIELD2 = "FIELD2"; + private static final String FIELD3 = "FIELD3"; + private String CREATE_TABLE = + "CREATE TABLE IF NOT EXISTS %s ( " + + " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))"; + private String UPSERT = "UPSERT into %s values (?, ?, ?)"; + + private static List> result; + private String tableName; + private MyClock clock; + + @Before + public void injectMyClock() { + clock = new MyClock(1000); + // Use our own clock to prevent race between partial rebuilder and compaction + EnvironmentEdgeManager.injectEdge(clock); + } -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; + @After + public void removeMyClock() { + EnvironmentEdgeManager.injectEdge(null); + } -public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT { - private final static String SNAPSHOT_NAME = "FOO"; - private static final String FIELD1 = "FIELD1"; - private static final String FIELD2 = "FIELD2"; - private static final String FIELD3 = "FIELD3"; - private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " + - " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))"; - private String UPSERT = "UPSERT into %s values (?, ?, ?)"; - - private static List> result; - private long timestamp; - private String tableName; - - - @Test - public void testMapReduceSnapshots() throws Exception { - // create table - Connection conn = DriverManager.getConnection(getUrl()); - tableName = generateUniqueName(); - conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); - conn.commit(); - - // configure Phoenix M/R job to read snapshot - final Configuration conf = getUtility().getConfiguration(); - Job job = Job.getInstance(conf); - Path tmpDir = getUtility().getRandomDir(); - - PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir, null, FIELD1, FIELD2, FIELD3); - - // configure and test job - configureJob(job, tableName, null, null); - } - - @Test - public void testMapReduceSnapshotsWithCondition() throws Exception { - // create table - Connection conn = DriverManager.getConnection(getUrl()); - tableName = generateUniqueName(); - conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); - conn.commit(); - - // configure Phoenix M/R job to read snapshot - final Configuration conf = getUtility().getConfiguration(); - Job job = Job.getInstance(conf); - Path tmpDir = getUtility().getRandomDir(); - PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3); - - // configure and test job - configureJob(job, tableName, null, "FIELD3 > 0001"); - - } - - @Test - public void testMapReduceSnapshotWithLimit() throws Exception { - // create table - Connection conn = DriverManager.getConnection(getUrl()); - tableName = generateUniqueName(); - conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); - conn.commit(); - - // configure Phoenix M/R job to read snapshot - final Configuration conf = getUtility().getConfiguration(); - Job job = Job.getInstance(conf); - Path tmpDir = getUtility().getRandomDir(); - // Running limit with order by on non pk column - String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1"; - PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir,inputQuery); - - // configure and test job - configureJob(job, tableName, inputQuery, null); - } - - private void configureJob(Job job, String tableName, String inputQuery, String condition) throws Exception { - try { - upsertAndSnapshot(tableName); - result = new ArrayList<>(); - - job.setMapperClass(TableSnapshotMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(NullWritable.class); - job.setOutputFormatClass(NullOutputFormat.class); - - Assert.assertTrue(job.waitForCompletion(true)); - - // verify the result, should match the values at the corresponding timestamp - Properties props = new Properties(); - props.setProperty("CurrentSCN", Long.toString(timestamp)); - - StringBuilder selectQuery = new StringBuilder("SELECT * FROM " + tableName); - if (condition != null) { - selectQuery.append(" WHERE " + condition); - } - if (inputQuery == null) - inputQuery = selectQuery.toString(); - - ResultSet rs = DriverManager.getConnection(getUrl(), props).createStatement().executeQuery(inputQuery); - - for (List r : result) { - assertTrue("No data stored in the table!", rs.next()); - int i = 0; - String field1 = rs.getString(i + 1); - assertEquals("Got the incorrect value for field1", r.get(i++), field1); - String field2 = rs.getString(i + 1); - assertEquals("Got the incorrect value for field2", r.get(i++), field2); - int field3 = rs.getInt(i + 1); - assertEquals("Got the incorrect value for field3", r.get(i++), field3); - } - - assertFalse("Should only have stored" + result.size() + "rows in the table for the timestamp!", rs.next()); - } finally { - deleteSnapshotAndTable(tableName); + @Test + public void testMapReduceSnapshots() throws Exception { + // create table + Connection conn = DriverManager.getConnection(getUrl()); + tableName = generateUniqueName(); + conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); + conn.commit(); + + // configure Phoenix M/R job to read snapshot + final Configuration conf = getUtility().getConfiguration(); + Job job = Job.getInstance(conf); + Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME); + + PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, + tmpDir, null, FIELD1, FIELD2, FIELD3); + + // configure and test job + configureJob(job, tableName, null, null); } - } - - private void upsertData(String tableName) throws SQLException { - Connection conn = DriverManager.getConnection(getUrl()); - PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); - upsertData(stmt, "CCCC", "SSDD", 0001); - upsertData(stmt, "CCCC", "HDHG", 0005); - upsertData(stmt, "BBBB", "JSHJ", 0002); - upsertData(stmt, "AAAA", "JHHD", 0003); - conn.commit(); - timestamp = System.currentTimeMillis(); - } - - private void upsertData(PreparedStatement stmt, String field1, String field2, int field3) throws SQLException { - stmt.setString(1, field1); - stmt.setString(2, field2); - stmt.setInt(3, field3); - stmt.execute(); - } - - public void upsertAndSnapshot(String tableName) throws Exception { - upsertData(tableName); - - Connection conn = DriverManager.getConnection(getUrl()); - HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); - admin.snapshot(SNAPSHOT_NAME, TableName.valueOf(tableName)); - // call flush to create new files in the region - admin.flush(tableName); - - List snapshots = admin.listSnapshots(); - Assert.assertEquals(tableName, snapshots.get(0).getTable()); - - // upsert data after snapshot - PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); - upsertData(stmt, "DDDD", "SNFB", 0004); - conn.commit(); - } - - public void deleteSnapshotAndTable(String tableName) throws Exception { - Connection conn = DriverManager.getConnection(getUrl()); - HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); - admin.deleteSnapshot(SNAPSHOT_NAME); - - conn.createStatement().execute("DROP TABLE " + tableName); - conn.close(); - - } - - public static class TableSnapshotMapper extends Mapper { - - @Override - protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context) - throws IOException, InterruptedException { - final List values = record.getValues(); - result.add(values); - - // write dummy data - context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()), - NullWritable.get()); + + @Test + public void testMapReduceSnapshotsWithCondition() throws Exception { + // create table + Connection conn = DriverManager.getConnection(getUrl()); + tableName = generateUniqueName(); + conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); + conn.commit(); + + // configure Phoenix M/R job to read snapshot + final Configuration conf = getUtility().getConfiguration(); + Job job = Job.getInstance(conf); + Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME); + PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, + tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3); + + // configure and test job + configureJob(job, tableName, null, "FIELD3 > 0001"); + + } + + @Test + public void testMapReduceSnapshotWithLimit() throws Exception { + // create table + Connection conn = DriverManager.getConnection(getUrl()); + tableName = generateUniqueName(); + conn.createStatement().execute(String.format(CREATE_TABLE, tableName)); + conn.commit(); + + // configure Phoenix M/R job to read snapshot + final Configuration conf = getUtility().getConfiguration(); + Job job = Job.getInstance(conf); + Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME); + // Running limit with order by on non pk column + String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1"; + PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName, + tmpDir, inputQuery); + + // configure and test job + configureJob(job, tableName, inputQuery, null); + } + + private void configureJob(Job job, String tableName, String inputQuery, String condition) + throws Exception { + try { + upsertAndSnapshot(tableName); + result = new ArrayList<>(); + + job.setMapperClass(TableSnapshotMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(NullWritable.class); + job.setOutputFormatClass(NullOutputFormat.class); + + Assert.assertTrue(job.waitForCompletion(true)); + + // verify the result, should match the values at the corresponding timestamp + Properties props = new Properties(); + props.setProperty("CurrentSCN", Long.toString(clock.time)); + StringBuilder selectQuery = new StringBuilder("SELECT * FROM " + tableName); + if (condition != null) { + selectQuery.append(" WHERE " + condition); + } + if (inputQuery == null) inputQuery = selectQuery.toString(); + + ResultSet rs = + DriverManager.getConnection(getUrl(), props).createStatement() + .executeQuery(inputQuery); + + for (List r : result) { + assertTrue("No data stored in the table!", rs.next()); + int i = 0; + String field1 = rs.getString(i + 1); + assertEquals("Got the incorrect value for field1", r.get(i++), field1); + String field2 = rs.getString(i + 1); + assertEquals("Got the incorrect value for field2", r.get(i++), field2); + int field3 = rs.getInt(i + 1); + assertEquals("Got the incorrect value for field3", r.get(i++), field3); + } + + assertFalse( + "Should only have stored " + result.size() + "rows in the table for the timestamp!", + rs.next()); + } finally { + deleteSnapshotAndTable(tableName); + } + } + + private static class MyClock extends EnvironmentEdge { + public volatile long time; + + public MyClock(long time) { + this.time = time; + } + + @Override + public long currentTime() { + return time; + } + } + + private void upsertData(String tableName) throws SQLException { + Connection conn = DriverManager.getConnection(getUrl()); + PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); + upsertData(stmt, "CCCC", "SSDD", 0001); + upsertData(stmt, "CCCC", "HDHG", 0005); + upsertData(stmt, "BBBB", "JSHJ", 0002); + upsertData(stmt, "AAAA", "JHHD", 0003); + conn.commit(); + } + + private void upsertData(PreparedStatement stmt, String field1, String field2, int field3) + throws SQLException { + stmt.setString(1, field1); + stmt.setString(2, field2); + stmt.setInt(3, field3); + stmt.execute(); + } + + public void upsertAndSnapshot(String tableName) throws Exception { + clock.time += 1000; + upsertData(tableName); + + Connection conn = DriverManager.getConnection(getUrl()); + HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); + admin.snapshot(SNAPSHOT_NAME, TableName.valueOf(tableName)); + // call flush to create new files in the region + admin.flush(tableName); + + List snapshots = admin.listSnapshots(); + Assert.assertEquals(tableName, snapshots.get(0).getTable()); + + clock.time += 1000; + // upsert data after snapshot + PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName)); + upsertData(stmt, "DDDD", "SNFB", 0004); + conn.commit(); + } + + public void deleteSnapshotAndTable(String tableName) throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); + admin.deleteSnapshot(SNAPSHOT_NAME); + } + + public static class TableSnapshotMapper extends + Mapper { + + @Override + protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context) + throws IOException, InterruptedException { + final List values = record.getValues(); + result.add(values); + + // write dummy data + context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()), + NullWritable.get()); + } } - } }