phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sama...@apache.org
Subject phoenix git commit: PHOENIX-4141 Fix flapping TableSnapshotReadsMapReduceIT
Date Wed, 30 Aug 2017 04:40:42 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.2 02acd5c2c -> b33131d91


PHOENIX-4141 Fix flapping TableSnapshotReadsMapReduceIT


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b33131d9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b33131d9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b33131d9

Branch: refs/heads/4.x-HBase-1.2
Commit: b33131d9157f90301dd791c88ca89d2041d37c61
Parents: 02acd5c
Author: Samarth Jain <samarth@apache.org>
Authored: Tue Aug 29 21:40:36 2017 -0700
Committer: Samarth Jain <samarth@apache.org>
Committed: Tue Aug 29 21:40:36 2017 -0700

----------------------------------------------------------------------
 .../end2end/TableSnapshotReadsMapReduceIT.java  | 402 ++++++++++---------
 1 file changed, 220 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b33131d9/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
index 4cc2a20..591f028 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableSnapshotReadsMapReduceIT.java
@@ -18,11 +18,25 @@
 
 package org.apache.phoenix.end2end;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
-
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.io.NullWritable;
@@ -32,191 +46,215 @@ import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.PhoenixIndexDBWritable;
 import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
-import org.junit.*;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
-import java.io.IOException;
-import java.sql.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
+public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
+    private final static String SNAPSHOT_NAME = "FOO";
+    private static final String FIELD1 = "FIELD1";
+    private static final String FIELD2 = "FIELD2";
+    private static final String FIELD3 = "FIELD3";
+    private String CREATE_TABLE =
+            "CREATE TABLE IF NOT EXISTS %s ( "
+                    + " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT
pk PRIMARY KEY (FIELD1 ))";
+    private String UPSERT = "UPSERT into %s values (?, ?, ?)";
+
+    private static List<List<Object>> result;
+    private String tableName;
+    private MyClock clock;
+
+    @Before
+    public void injectMyClock() {
+        clock = new MyClock(1000);
+        // Use our own clock to prevent race between partial rebuilder and compaction
+        EnvironmentEdgeManager.injectEdge(clock);
+    }
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+    @After
+    public void removeMyClock() {
+        EnvironmentEdgeManager.injectEdge(null);
+    }
 
-public class TableSnapshotReadsMapReduceIT extends ParallelStatsDisabledIT {
-  private final static String SNAPSHOT_NAME = "FOO";
-  private static final String FIELD1 = "FIELD1";
-  private static final String FIELD2 = "FIELD2";
-  private static final String FIELD3 = "FIELD3";
-  private String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
-      " FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY
(FIELD1 ))";
-  private String UPSERT = "UPSERT into %s values (?, ?, ?)";
-
-  private static List<List<Object>> result;
-  private long timestamp;
-  private String tableName;
-
-
-  @Test
-  public void testMapReduceSnapshots() throws Exception {
-    // create table
-    Connection conn = DriverManager.getConnection(getUrl());
-    tableName = generateUniqueName();
-    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
-    conn.commit();
-
-    // configure Phoenix M/R job to read snapshot
-    final Configuration conf = getUtility().getConfiguration();
-    Job job = Job.getInstance(conf);
-    Path tmpDir = getUtility().getRandomDir();
-
-    PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir,
null, FIELD1, FIELD2, FIELD3);
-
-    // configure and test job
-    configureJob(job, tableName, null, null);
-  }
-
-  @Test
-  public void testMapReduceSnapshotsWithCondition() throws Exception {
-    // create table
-    Connection conn = DriverManager.getConnection(getUrl());
-    tableName = generateUniqueName();
-    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
-    conn.commit();
-
-    // configure Phoenix M/R job to read snapshot
-    final Configuration conf = getUtility().getConfiguration();
-    Job job = Job.getInstance(conf);
-    Path tmpDir = getUtility().getRandomDir();
-    PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir,
FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3);
-
-    // configure and test job
-    configureJob(job, tableName, null, "FIELD3 > 0001");
-
-  }
-
-  @Test
-  public void testMapReduceSnapshotWithLimit() throws Exception {
-    // create table
-    Connection conn = DriverManager.getConnection(getUrl());
-    tableName = generateUniqueName();
-    conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
-    conn.commit();
-
-    // configure Phoenix M/R job to read snapshot
-    final Configuration conf = getUtility().getConfiguration();
-    Job job = Job.getInstance(conf);
-    Path tmpDir = getUtility().getRandomDir();
-    // Running limit with order by on non pk column
-    String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1";
-    PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,SNAPSHOT_NAME,tableName,tmpDir,inputQuery);
-
-    // configure and test job
-    configureJob(job, tableName, inputQuery, null);
-  }
-
-  private void configureJob(Job job, String tableName, String inputQuery, String condition)
throws Exception {
-    try {
-      upsertAndSnapshot(tableName);
-      result = new ArrayList<>();
-
-      job.setMapperClass(TableSnapshotMapper.class);
-      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-      job.setMapOutputValueClass(NullWritable.class);
-      job.setOutputFormatClass(NullOutputFormat.class);
-
-      Assert.assertTrue(job.waitForCompletion(true));
-
-      // verify the result, should match the values at the corresponding timestamp
-      Properties props = new Properties();
-      props.setProperty("CurrentSCN", Long.toString(timestamp));
-
-      StringBuilder selectQuery = new StringBuilder("SELECT * FROM " + tableName);
-      if (condition != null) {
-        selectQuery.append(" WHERE " + condition);
-      }
-      if (inputQuery == null)
-        inputQuery = selectQuery.toString();
-
-      ResultSet rs = DriverManager.getConnection(getUrl(), props).createStatement().executeQuery(inputQuery);
-
-      for (List<Object> r : result) {
-        assertTrue("No data stored in the table!", rs.next());
-        int i = 0;
-        String field1 = rs.getString(i + 1);
-        assertEquals("Got the incorrect value for field1", r.get(i++), field1);
-        String field2 = rs.getString(i + 1);
-        assertEquals("Got the incorrect value for field2", r.get(i++), field2);
-        int field3 = rs.getInt(i + 1);
-        assertEquals("Got the incorrect value for field3", r.get(i++), field3);
-      }
-
-      assertFalse("Should only have stored" + result.size() + "rows in the table for the
timestamp!", rs.next());
-    } finally {
-      deleteSnapshotAndTable(tableName);
+    @Test
+    public void testMapReduceSnapshots() throws Exception {
+        // create table
+        Connection conn = DriverManager.getConnection(getUrl());
+        tableName = generateUniqueName();
+        conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+        conn.commit();
+
+        // configure Phoenix M/R job to read snapshot
+        final Configuration conf = getUtility().getConfiguration();
+        Job job = Job.getInstance(conf);
+        Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME);
+
+        PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
+            tmpDir, null, FIELD1, FIELD2, FIELD3);
+
+        // configure and test job
+        configureJob(job, tableName, null, null);
     }
-  }
-
-  private void upsertData(String tableName) throws SQLException {
-    Connection conn = DriverManager.getConnection(getUrl());
-    PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
-    upsertData(stmt, "CCCC", "SSDD", 0001);
-    upsertData(stmt, "CCCC", "HDHG", 0005);
-    upsertData(stmt, "BBBB", "JSHJ", 0002);
-    upsertData(stmt, "AAAA", "JHHD", 0003);
-    conn.commit();
-    timestamp = System.currentTimeMillis();
-  }
-
-  private void upsertData(PreparedStatement stmt, String field1, String field2, int field3)
throws SQLException {
-    stmt.setString(1, field1);
-    stmt.setString(2, field2);
-    stmt.setInt(3, field3);
-    stmt.execute();
-  }
-
-  public void upsertAndSnapshot(String tableName) throws Exception {
-    upsertData(tableName);
-
-    Connection conn = DriverManager.getConnection(getUrl());
-    HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
-    admin.snapshot(SNAPSHOT_NAME, TableName.valueOf(tableName));
-    // call flush to create new files in the region
-    admin.flush(tableName);
-
-    List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots();
-    Assert.assertEquals(tableName, snapshots.get(0).getTable());
-
-    // upsert data after snapshot
-    PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
-    upsertData(stmt, "DDDD", "SNFB", 0004);
-    conn.commit();
-  }
-
-  public void deleteSnapshotAndTable(String tableName) throws Exception {
-    Connection conn = DriverManager.getConnection(getUrl());
-    HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
-    admin.deleteSnapshot(SNAPSHOT_NAME);
-
-    conn.createStatement().execute("DROP TABLE " + tableName);
-    conn.close();
-
-  }
-
-  public static class TableSnapshotMapper extends Mapper<NullWritable, PhoenixIndexDBWritable,
ImmutableBytesWritable, NullWritable> {
-
-    @Override
-    protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
-        throws IOException, InterruptedException {
-      final List<Object> values = record.getValues();
-      result.add(values);
-
-      // write dummy data
-      context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()),
-          NullWritable.get());
+
+    @Test
+    public void testMapReduceSnapshotsWithCondition() throws Exception {
+        // create table
+        Connection conn = DriverManager.getConnection(getUrl());
+        tableName = generateUniqueName();
+        conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+        conn.commit();
+
+        // configure Phoenix M/R job to read snapshot
+        final Configuration conf = getUtility().getConfiguration();
+        Job job = Job.getInstance(conf);
+        Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME);
+        PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
+            tmpDir, FIELD3 + " > 0001", FIELD1, FIELD2, FIELD3);
+
+        // configure and test job
+        configureJob(job, tableName, null, "FIELD3 > 0001");
+
+    }
+
+    @Test
+    public void testMapReduceSnapshotWithLimit() throws Exception {
+        // create table
+        Connection conn = DriverManager.getConnection(getUrl());
+        tableName = generateUniqueName();
+        conn.createStatement().execute(String.format(CREATE_TABLE, tableName));
+        conn.commit();
+
+        // configure Phoenix M/R job to read snapshot
+        final Configuration conf = getUtility().getConfiguration();
+        Job job = Job.getInstance(conf);
+        Path tmpDir = getUtility().getDataTestDirOnTestFS(SNAPSHOT_NAME);
+        // Running limit with order by on non pk column
+        String inputQuery = "SELECT * FROM " + tableName + " ORDER BY FIELD2 LIMIT 1";
+        PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, SNAPSHOT_NAME, tableName,
+            tmpDir, inputQuery);
+
+        // configure and test job
+        configureJob(job, tableName, inputQuery, null);
+    }
+
+    private void configureJob(Job job, String tableName, String inputQuery, String condition)
+            throws Exception {
+        try {
+            upsertAndSnapshot(tableName);
+            result = new ArrayList<>();
+
+            job.setMapperClass(TableSnapshotMapper.class);
+            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+            job.setMapOutputValueClass(NullWritable.class);
+            job.setOutputFormatClass(NullOutputFormat.class);
+
+            Assert.assertTrue(job.waitForCompletion(true));
+
+            // verify the result, should match the values at the corresponding timestamp
+            Properties props = new Properties();
+            props.setProperty("CurrentSCN", Long.toString(clock.time));
+            StringBuilder selectQuery = new StringBuilder("SELECT * FROM " + tableName);
+            if (condition != null) {
+                selectQuery.append(" WHERE " + condition);
+            }
+            if (inputQuery == null) inputQuery = selectQuery.toString();
+
+            ResultSet rs =
+                    DriverManager.getConnection(getUrl(), props).createStatement()
+                            .executeQuery(inputQuery);
+
+            for (List<Object> r : result) {
+                assertTrue("No data stored in the table!", rs.next());
+                int i = 0;
+                String field1 = rs.getString(i + 1);
+                assertEquals("Got the incorrect value for field1", r.get(i++), field1);
+                String field2 = rs.getString(i + 1);
+                assertEquals("Got the incorrect value for field2", r.get(i++), field2);
+                int field3 = rs.getInt(i + 1);
+                assertEquals("Got the incorrect value for field3", r.get(i++), field3);
+            }
+
+            assertFalse(
+                "Should only have stored " + result.size() + "rows in the table for the timestamp!",
+                rs.next());
+        } finally {
+            deleteSnapshotAndTable(tableName);
+        }
+    }
+
+    private static class MyClock extends EnvironmentEdge {
+        public volatile long time;
+
+        public MyClock(long time) {
+            this.time = time;
+        }
+
+        @Override
+        public long currentTime() {
+            return time;
+        }
+    }
+
+    private void upsertData(String tableName) throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+        PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
+        upsertData(stmt, "CCCC", "SSDD", 0001);
+        upsertData(stmt, "CCCC", "HDHG", 0005);
+        upsertData(stmt, "BBBB", "JSHJ", 0002);
+        upsertData(stmt, "AAAA", "JHHD", 0003);
+        conn.commit();
+    }
+
+    private void upsertData(PreparedStatement stmt, String field1, String field2, int field3)
+            throws SQLException {
+        stmt.setString(1, field1);
+        stmt.setString(2, field2);
+        stmt.setInt(3, field3);
+        stmt.execute();
+    }
+
+    public void upsertAndSnapshot(String tableName) throws Exception {
+        clock.time += 1000;
+        upsertData(tableName);
+
+        Connection conn = DriverManager.getConnection(getUrl());
+        HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+        admin.snapshot(SNAPSHOT_NAME, TableName.valueOf(tableName));
+        // call flush to create new files in the region
+        admin.flush(tableName);
+
+        List<HBaseProtos.SnapshotDescription> snapshots = admin.listSnapshots();
+        Assert.assertEquals(tableName, snapshots.get(0).getTable());
+
+        clock.time += 1000;
+        // upsert data after snapshot
+        PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
+        upsertData(stmt, "DDDD", "SNFB", 0004);
+        conn.commit();
+    }
+
+    public void deleteSnapshotAndTable(String tableName) throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+        admin.deleteSnapshot(SNAPSHOT_NAME);
+    }
+
+    public static class TableSnapshotMapper extends
+            Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable>
{
+
+        @Override
+        protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
+                throws IOException, InterruptedException {
+            final List<Object> values = record.getValues();
+            result.add(values);
+
+            // write dummy data
+            context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()),
+                NullWritable.get());
+        }
     }
-  }
 
 }


Mime
View raw message