Return-Path: X-Original-To: apmail-phoenix-commits-archive@minotaur.apache.org Delivered-To: apmail-phoenix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 41A831888A for ; Sat, 16 Jan 2016 02:14:27 +0000 (UTC) Received: (qmail 42581 invoked by uid 500); 16 Jan 2016 02:14:27 -0000 Delivered-To: apmail-phoenix-commits-archive@phoenix.apache.org Received: (qmail 42542 invoked by uid 500); 16 Jan 2016 02:14:27 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 42533 invoked by uid 99); 16 Jan 2016 02:14:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 Jan 2016 02:14:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 019CBE08EF; Sat, 16 Jan 2016 02:14:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jamestaylor@apache.org To: commits@phoenix.apache.org Message-Id: <50d0c56d89e6466eb68c9a7a85280e89@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: phoenix git commit: PHOENIX-2600 NPE on immutable index creation over transactional table Date: Sat, 16 Jan 2016 02:14:27 +0000 (UTC) Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.0 b07b91914 -> c5c6fdcb7 PHOENIX-2600 NPE on immutable index creation over transactional table Conflicts: phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c5c6fdcb Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c5c6fdcb Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c5c6fdcb Branch: refs/heads/4.x-HBase-1.0 Commit: c5c6fdcb727b466474a87d3dbcf127921d134e5e Parents: b07b919 Author: James Taylor Authored: Fri Jan 15 18:04:07 2016 -0800 Committer: James Taylor Committed: Fri Jan 15 18:14:09 2016 -0800 ---------------------------------------------------------------------- .../apache/phoenix/end2end/UpsertSelectIT.java | 23 + .../org/apache/phoenix/tx/TransactionIT.java | 442 ++++++++++--------- .../apache/phoenix/compile/DeleteCompiler.java | 1 - .../apache/phoenix/execute/MutationState.java | 40 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 1 + 5 files changed, 281 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c5c6fdcb/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java index b5252e0..364b423 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java @@ -1373,6 +1373,29 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT { assertEquals("[[128,0,0,54], [128,0,4,0]]", rs.getArray(2).toString()); } + + @Test + public void testParallelUpsertSelect() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3)); + props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3)); + props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + conn.createStatement().execute("CREATE SEQUENCE S1"); + conn.createStatement().execute("CREATE TABLE SALTEDT1 (pk INTEGER PRIMARY KEY, val INTEGER) SALT_BUCKETS=4"); + conn.createStatement().execute("CREATE TABLE T2 (pk INTEGER PRIMARY KEY, val INTEGER)"); + + for (int i = 0; i < 100; i++) { + conn.createStatement().execute("UPSERT INTO SALTEDT1 VALUES (NEXT VALUE FOR S1, " + (i%10) + ")"); + } + conn.commit(); + conn.setAutoCommit(true); + int upsertCount = conn.createStatement().executeUpdate("UPSERT INTO T2 SELECT pk, val FROM SALTEDT1"); + assertEquals(100,upsertCount); + conn.close(); + } + private static Connection getConnection(long ts) throws SQLException { Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c5c6fdcb/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java index 28ba53d..fe1f16c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java @@ -36,6 +36,12 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import co.cask.tephra.TransactionContext; +import co.cask.tephra.TransactionSystemClient; +import co.cask.tephra.TxConstants; +import co.cask.tephra.hbase10.TransactionAwareHTable; +import co.cask.tephra.hbase10.coprocessor.TransactionProcessor; + import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; @@ -68,16 +74,10 @@ import org.junit.Test; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import co.cask.tephra.TransactionContext; -import co.cask.tephra.TransactionSystemClient; -import co.cask.tephra.TxConstants; -import co.cask.tephra.hbase10.TransactionAwareHTable; -import co.cask.tephra.hbase10.coprocessor.TransactionProcessor; - public class TransactionIT extends BaseHBaseManagedTimeIT { - - private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE; - + + private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE; + @Before public void setUp() throws SQLException { ensureTableCreated(getUrl(), TRANSACTIONAL_DATA_TABLE); @@ -90,73 +90,73 @@ public class TransactionIT extends BaseHBaseManagedTimeIT { props.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.toString(true)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } - - @Test - public void testReadOwnWrites() throws Exception { - String selectSql = "SELECT * FROM "+FULL_TABLE_NAME; - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.setAutoCommit(false); - ResultSet rs = conn.createStatement().executeQuery(selectSql); - assertFalse(rs.next()); - - String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn.prepareStatement(upsert); - // upsert two rows - TestUtil.setRowKeyColumns(stmt, 1); - stmt.execute(); - TestUtil.setRowKeyColumns(stmt, 2); - stmt.execute(); - - // verify rows can be read even though commit has not been called - rs = conn.createStatement().executeQuery(selectSql); - TestUtil.validateRowKeyColumns(rs, 1); - TestUtil.validateRowKeyColumns(rs, 2); - assertFalse(rs.next()); - - conn.commit(); - - // verify rows can be read after commit - rs = conn.createStatement().executeQuery(selectSql); - TestUtil.validateRowKeyColumns(rs, 1); - TestUtil.validateRowKeyColumns(rs, 2); - assertFalse(rs.next()); - } - } - - @Test - public void testTxnClosedCorrecty() throws Exception { - String selectSql = "SELECT * FROM "+FULL_TABLE_NAME; - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.setAutoCommit(false); - ResultSet rs = conn.createStatement().executeQuery(selectSql); - assertFalse(rs.next()); - - String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn.prepareStatement(upsert); - // upsert two rows - TestUtil.setRowKeyColumns(stmt, 1); - stmt.execute(); - TestUtil.setRowKeyColumns(stmt, 2); - stmt.execute(); - - // verify rows can be read even though commit has not been called - rs = conn.createStatement().executeQuery(selectSql); - TestUtil.validateRowKeyColumns(rs, 1); - TestUtil.validateRowKeyColumns(rs, 2); - assertFalse(rs.next()); - - conn.close(); - // wait for any open txns to time out - Thread.sleep(DEFAULT_TXN_TIMEOUT_SECONDS*1000+10000); - assertTrue("There should be no invalid transactions", txManager.getInvalidSize()==0); - } - } - + + @Test + public void testReadOwnWrites() throws Exception { + String selectSql = "SELECT * FROM "+FULL_TABLE_NAME; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.setAutoCommit(false); + ResultSet rs = conn.createStatement().executeQuery(selectSql); + assertFalse(rs.next()); + + String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn.prepareStatement(upsert); + // upsert two rows + TestUtil.setRowKeyColumns(stmt, 1); + stmt.execute(); + TestUtil.setRowKeyColumns(stmt, 2); + stmt.execute(); + + // verify rows can be read even though commit has not been called + rs = conn.createStatement().executeQuery(selectSql); + TestUtil.validateRowKeyColumns(rs, 1); + TestUtil.validateRowKeyColumns(rs, 2); + assertFalse(rs.next()); + + conn.commit(); + + // verify rows can be read after commit + rs = conn.createStatement().executeQuery(selectSql); + TestUtil.validateRowKeyColumns(rs, 1); + TestUtil.validateRowKeyColumns(rs, 2); + assertFalse(rs.next()); + } + } + + @Test + public void testTxnClosedCorrecty() throws Exception { + String selectSql = "SELECT * FROM "+FULL_TABLE_NAME; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.setAutoCommit(false); + ResultSet rs = conn.createStatement().executeQuery(selectSql); + assertFalse(rs.next()); + + String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn.prepareStatement(upsert); + // upsert two rows + TestUtil.setRowKeyColumns(stmt, 1); + stmt.execute(); + TestUtil.setRowKeyColumns(stmt, 2); + stmt.execute(); + + // verify rows can be read even though commit has not been called + rs = conn.createStatement().executeQuery(selectSql); + TestUtil.validateRowKeyColumns(rs, 1); + TestUtil.validateRowKeyColumns(rs, 2); + assertFalse(rs.next()); + + conn.close(); + // wait for any open txns to time out + Thread.sleep(DEFAULT_TXN_TIMEOUT_SECONDS*1000+10000); + assertTrue("There should be no invalid transactions", txManager.getInvalidSize()==0); + } + } + @Test public void testDelete() throws Exception { String selectSQL = "SELECT * FROM " + FULL_TABLE_NAME; try (Connection conn1 = DriverManager.getConnection(getUrl()); - Connection conn2 = DriverManager.getConnection(getUrl())) { + Connection conn2 = DriverManager.getConnection(getUrl())) { conn1.setAutoCommit(false); ResultSet rs = conn1.createStatement().executeQuery(selectSQL); assertFalse(rs.next()); @@ -188,19 +188,19 @@ public class TransactionIT extends BaseHBaseManagedTimeIT { } } - @Test - public void testAutoCommitQuerySingleTable() throws Exception { - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.setAutoCommit(true); - // verify no rows returned - ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME); - assertFalse(rs.next()); - } - } - + @Test + public void testAutoCommitQuerySingleTable() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.setAutoCommit(true); + // verify no rows returned + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME); + assertFalse(rs.next()); + } + } + @Test public void testAutoCommitQueryMultiTables() throws Exception { - try (Connection conn = DriverManager.getConnection(getUrl())) { + try (Connection conn = DriverManager.getConnection(getUrl())) { conn.setAutoCommit(true); // verify no rows returned ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME + " a JOIN " + FULL_TABLE_NAME + " b ON (a.long_pk = b.int_pk)"); @@ -208,87 +208,87 @@ public class TransactionIT extends BaseHBaseManagedTimeIT { } } - @Test - public void testColConflicts() throws Exception { - try (Connection conn1 = DriverManager.getConnection(getUrl()); - Connection conn2 = DriverManager.getConnection(getUrl())) { - conn1.setAutoCommit(false); - conn2.setAutoCommit(false); - String selectSql = "SELECT * FROM "+FULL_TABLE_NAME; - conn1.setAutoCommit(false); - ResultSet rs = conn1.createStatement().executeQuery(selectSql); - assertFalse(rs.next()); - // upsert row using conn1 - String upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn1.prepareStatement(upsertSql); - TestUtil.setRowKeyColumns(stmt, 1); - stmt.setInt(7, 10); - stmt.execute(); - // upsert row using conn2 - stmt = conn2.prepareStatement(upsertSql); - TestUtil.setRowKeyColumns(stmt, 1); - stmt.setInt(7, 11); - stmt.execute(); - - conn1.commit(); - //second commit should fail - try { - conn2.commit(); - fail(); - } - catch (SQLException e) { - assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode()); - } - } - } - - private void testRowConflicts() throws Exception { - try (Connection conn1 = DriverManager.getConnection(getUrl()); - Connection conn2 = DriverManager.getConnection(getUrl())) { - conn1.setAutoCommit(false); - conn2.setAutoCommit(false); - String selectSql = "SELECT * FROM "+FULL_TABLE_NAME; - conn1.setAutoCommit(false); - ResultSet rs = conn1.createStatement().executeQuery(selectSql); - boolean immutableRows = conn1.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, FULL_TABLE_NAME)).isImmutableRows(); - assertFalse(rs.next()); - // upsert row using conn1 - String upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn1.prepareStatement(upsertSql); - TestUtil.setRowKeyColumns(stmt, 1); - stmt.setInt(7, 10); - stmt.execute(); - // upsert row using conn2 - upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, b.int_col2) VALUES(?, ?, ?, ?, ?, ?, ?)"; - stmt = conn2.prepareStatement(upsertSql); - TestUtil.setRowKeyColumns(stmt, 1); - stmt.setInt(7, 11); - stmt.execute(); - - conn1.commit(); - //second commit should fail - try { - conn2.commit(); - if (!immutableRows) fail(); - } - catch (SQLException e) { - if (immutableRows) fail(); - assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode()); - } - } - } - - @Test - public void testRowConflictDetected() throws Exception { - testRowConflicts(); - } - - @Test - public void testNoConflictDetectionForImmutableRows() throws Exception { - Connection conn = DriverManager.getConnection(getUrl()); - conn.createStatement().execute("ALTER TABLE " + FULL_TABLE_NAME + " SET IMMUTABLE_ROWS=true"); - testRowConflicts(); - } + @Test + public void testColConflicts() throws Exception { + try (Connection conn1 = DriverManager.getConnection(getUrl()); + Connection conn2 = DriverManager.getConnection(getUrl())) { + conn1.setAutoCommit(false); + conn2.setAutoCommit(false); + String selectSql = "SELECT * FROM "+FULL_TABLE_NAME; + conn1.setAutoCommit(false); + ResultSet rs = conn1.createStatement().executeQuery(selectSql); + assertFalse(rs.next()); + // upsert row using conn1 + String upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn1.prepareStatement(upsertSql); + TestUtil.setRowKeyColumns(stmt, 1); + stmt.setInt(7, 10); + stmt.execute(); + // upsert row using conn2 + stmt = conn2.prepareStatement(upsertSql); + TestUtil.setRowKeyColumns(stmt, 1); + stmt.setInt(7, 11); + stmt.execute(); + + conn1.commit(); + //second commit should fail + try { + conn2.commit(); + fail(); + } + catch (SQLException e) { + assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode()); + } + } + } + + private void testRowConflicts() throws Exception { + try (Connection conn1 = DriverManager.getConnection(getUrl()); + Connection conn2 = DriverManager.getConnection(getUrl())) { + conn1.setAutoCommit(false); + conn2.setAutoCommit(false); + String selectSql = "SELECT * FROM "+FULL_TABLE_NAME; + conn1.setAutoCommit(false); + ResultSet rs = conn1.createStatement().executeQuery(selectSql); + boolean immutableRows = conn1.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, FULL_TABLE_NAME)).isImmutableRows(); + assertFalse(rs.next()); + // upsert row using conn1 + String upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn1.prepareStatement(upsertSql); + TestUtil.setRowKeyColumns(stmt, 1); + stmt.setInt(7, 10); + stmt.execute(); + // upsert row using conn2 + upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, b.int_col2) VALUES(?, ?, ?, ?, ?, ?, ?)"; + stmt = conn2.prepareStatement(upsertSql); + TestUtil.setRowKeyColumns(stmt, 1); + stmt.setInt(7, 11); + stmt.execute(); + + conn1.commit(); + //second commit should fail + try { + conn2.commit(); + if (!immutableRows) fail(); + } + catch (SQLException e) { + if (immutableRows) fail(); + assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode()); + } + } + } + + @Test + public void testRowConflictDetected() throws Exception { + testRowConflicts(); + } + + @Test + public void testNoConflictDetectionForImmutableRows() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("ALTER TABLE " + FULL_TABLE_NAME + " SET IMMUTABLE_ROWS=true"); + testRowConflicts(); + } @Test public void testNonTxToTxTable() throws Exception { @@ -514,33 +514,33 @@ public class TransactionIT extends BaseHBaseManagedTimeIT { } public void testCurrentDate() throws Exception { - String selectSql = "SELECT current_date() FROM "+FULL_TABLE_NAME; - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.setAutoCommit(false); - ResultSet rs = conn.createStatement().executeQuery(selectSql); - assertFalse(rs.next()); - - String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn.prepareStatement(upsert); - // upsert two rows - TestUtil.setRowKeyColumns(stmt, 1); - stmt.execute(); - conn.commit(); - - rs = conn.createStatement().executeQuery(selectSql); - assertTrue(rs.next()); - Date date1 = rs.getDate(1); - assertFalse(rs.next()); - - Thread.sleep(1000); - - rs = conn.createStatement().executeQuery(selectSql); - assertTrue(rs.next()); - Date date2 = rs.getDate(1); - assertFalse(rs.next()); - assertTrue("current_date() should change while executing multiple statements", date2.getTime() > date1.getTime()); - } - } + String selectSql = "SELECT current_date() FROM "+FULL_TABLE_NAME; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.setAutoCommit(false); + ResultSet rs = conn.createStatement().executeQuery(selectSql); + assertFalse(rs.next()); + + String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn.prepareStatement(upsert); + // upsert two rows + TestUtil.setRowKeyColumns(stmt, 1); + stmt.execute(); + conn.commit(); + + rs = conn.createStatement().executeQuery(selectSql); + assertTrue(rs.next()); + Date date1 = rs.getDate(1); + assertFalse(rs.next()); + + Thread.sleep(1000); + + rs = conn.createStatement().executeQuery(selectSql); + assertTrue(rs.next()); + Date date2 = rs.getDate(1); + assertFalse(rs.next()); + assertTrue("current_date() should change while executing multiple statements", date2.getTime() > date1.getTime()); + } + } @Test public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws SQLException { @@ -558,32 +558,32 @@ public class TransactionIT extends BaseHBaseManagedTimeIT { @Test public void testRowTimestampDisabled() throws SQLException { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - conn.setAutoCommit(false); - Statement stmt = conn.createStatement(); - try { - stmt.execute("CREATE TABLE DEMO(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP)) TRANSACTIONAL=true"); - fail(); - } - catch(SQLException e) { - assertEquals(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode()); - } - stmt.execute("CREATE TABLE DEMO(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP))"); - try { - stmt.execute("ALTER TABLE DEMO SET TRANSACTIONAL=true"); - fail(); - } - catch(SQLException e) { - assertEquals(SQLExceptionCode.CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode()); - } + conn.setAutoCommit(false); + Statement stmt = conn.createStatement(); + try { + stmt.execute("CREATE TABLE DEMO(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP)) TRANSACTIONAL=true"); + fail(); + } + catch(SQLException e) { + assertEquals(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode()); + } + stmt.execute("CREATE TABLE DEMO(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP))"); + try { + stmt.execute("ALTER TABLE DEMO SET TRANSACTIONAL=true"); + fail(); + } + catch(SQLException e) { + assertEquals(SQLExceptionCode.CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode()); + } } } @Test public void testReadOnlyView() throws Exception { Connection conn = DriverManager.getConnection(getUrl()); - String ddl = "CREATE TABLE t (k INTEGER NOT NULL PRIMARY KEY, v1 DATE) TRANSACTIONAL=true"; + String ddl = "CREATE TABLE t (k INTEGER NOT NULL PRIMARY KEY, v1 DATE) TRANSACTIONAL=true"; conn.createStatement().execute(ddl); ddl = "CREATE VIEW v (v2 VARCHAR) AS SELECT * FROM t where k>4"; conn.createStatement().execute(ddl); @@ -870,4 +870,26 @@ public class TransactionIT extends BaseHBaseManagedTimeIT { assertFalse(rs.next()); } } + + @Test + public void testParallelUpsertSelect() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3)); + props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3)); + props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + conn.createStatement().execute("CREATE SEQUENCE S1"); + conn.createStatement().execute("CREATE TABLE SALTEDT1 (pk INTEGER PRIMARY KEY, val INTEGER) SALT_BUCKETS=4,TRANSACTIONAL=true"); + conn.createStatement().execute("CREATE TABLE T2 (pk INTEGER PRIMARY KEY, val INTEGER) TRANSACTIONAL=true"); + + for (int i = 0; i < 100; i++) { + conn.createStatement().execute("UPSERT INTO SALTEDT1 VALUES (NEXT VALUE FOR S1, " + (i%10) + ")"); + } + conn.commit(); + conn.setAutoCommit(true); + int upsertCount = conn.createStatement().executeUpdate("UPSERT INTO T2 SELECT pk, val FROM SALTEDT1"); + assertEquals(100,upsertCount); + conn.close(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c5c6fdcb/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 924ed43..4c41f82 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -395,7 +395,6 @@ public class DeleteCompiler { break; } final QueryPlan dataPlan = dataPlanToBe; - final ColumnResolver resolver = resolverToBe; final boolean hasImmutableIndexes = !immutableIndex.isEmpty(); // tableRefs is parallel with queryPlans TableRef[] tableRefs = new TableRef[hasImmutableIndexes ? immutableIndex.size() : 1]; http://git-wip-us.apache.org/repos/asf/phoenix/blob/c5c6fdcb/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 38067c1..27de68a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -37,6 +37,18 @@ import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import javax.annotation.concurrent.Immutable; +import co.cask.tephra.Transaction; +import co.cask.tephra.Transaction.VisibilityLevel; +import co.cask.tephra.TransactionAware; +import co.cask.tephra.TransactionCodec; +import co.cask.tephra.TransactionConflictException; +import co.cask.tephra.TransactionContext; +import co.cask.tephra.TransactionFailureException; +import co.cask.tephra.TransactionSystemClient; +import co.cask.tephra.hbase10.TransactionAwareHTable; +import co.cask.tephra.visibility.FenceWait; +import co.cask.tephra.visibility.VisibilityFence; + import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; @@ -98,18 +110,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import co.cask.tephra.Transaction; -import co.cask.tephra.Transaction.VisibilityLevel; -import co.cask.tephra.TransactionAware; -import co.cask.tephra.TransactionCodec; -import co.cask.tephra.TransactionConflictException; -import co.cask.tephra.TransactionContext; -import co.cask.tephra.TransactionFailureException; -import co.cask.tephra.TransactionSystemClient; -import co.cask.tephra.hbase10.TransactionAwareHTable; -import co.cask.tephra.visibility.FenceWait; -import co.cask.tephra.visibility.VisibilityFence; - /** * * Tracks the uncommitted state @@ -242,7 +242,18 @@ public class MutationState implements SQLCloseable { } } - private void addReadFence(PTable dataTable) throws SQLException { + /** + * Add an entry to the change set representing the DML operation that is starting. + * These entries will not conflict with each other, but they will conflict with a + * DDL operation of creating an index. See {@link #addReadFence(PTable)} and TEPHRA-157 + * for more information. + * @param dataTable the table which is doing DML + * @throws SQLException + */ + public void addReadFence(PTable dataTable) throws SQLException { + if (this.txContext == null) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException(); + } byte[] logicalKey = SchemaUtil.getTableKey(dataTable); this.txContext.addTransactionAware(VisibilityFence.create(logicalKey)); byte[] physicalKey = dataTable.getPhysicalName().getBytes(); @@ -848,8 +859,7 @@ public class MutationState implements SQLCloseable { final PTable table = tableRef.getTable(); // Track tables to which we've sent uncommitted data if (isTransactional = table.isTransactional()) { - addReadFence(table); - txTableRefs.add(tableRef); + txTableRefs.add(tableRef); uncommittedPhysicalNames.add(table.getPhysicalName().getString()); } boolean isDataTable = true; http://git-wip-us.apache.org/repos/asf/phoenix/blob/c5c6fdcb/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 7e8969b..6bb5722 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -331,6 +331,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { MutationPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE); if (plan.getTargetRef() != null && plan.getTargetRef().getTable() != null && plan.getTargetRef().getTable().isTransactional()) { state.startTransaction(); + state.addReadFence(plan.getTargetRef().getTable()); } Iterator tableRefs = plan.getSourceRefs().iterator(); state.sendUncommitted(tableRefs);