phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject phoenix git commit: PHOENIX-2600 NPE on immutable index creation over transactional table
Date Sat, 16 Jan 2016 02:14:27 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.0 b07b91914 -> c5c6fdcb7


PHOENIX-2600 NPE on immutable index creation over transactional table

Conflicts:

	phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
	phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c5c6fdcb
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c5c6fdcb
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c5c6fdcb

Branch: refs/heads/4.x-HBase-1.0
Commit: c5c6fdcb727b466474a87d3dbcf127921d134e5e
Parents: b07b919
Author: James Taylor <jamestaylor@apache.org>
Authored: Fri Jan 15 18:04:07 2016 -0800
Committer: James Taylor <jamestaylor@apache.org>
Committed: Fri Jan 15 18:14:09 2016 -0800

----------------------------------------------------------------------
 .../apache/phoenix/end2end/UpsertSelectIT.java  |  23 +
 .../org/apache/phoenix/tx/TransactionIT.java    | 442 ++++++++++---------
 .../apache/phoenix/compile/DeleteCompiler.java  |   1 -
 .../apache/phoenix/execute/MutationState.java   |  40 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   1 +
 5 files changed, 281 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c5c6fdcb/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index b5252e0..364b423 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -1373,6 +1373,29 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT {
         assertEquals("[[128,0,0,54], [128,0,4,0]]", rs.getArray(2).toString());
     }
 
+
+    @Test
+    public void testParallelUpsertSelect() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        conn.createStatement().execute("CREATE SEQUENCE S1");
+        conn.createStatement().execute("CREATE TABLE SALTEDT1 (pk INTEGER PRIMARY KEY, val
INTEGER) SALT_BUCKETS=4");
+        conn.createStatement().execute("CREATE TABLE T2 (pk INTEGER PRIMARY KEY, val INTEGER)");
+
+        for (int i = 0; i < 100; i++) {
+            conn.createStatement().execute("UPSERT INTO SALTEDT1 VALUES (NEXT VALUE FOR S1,
" + (i%10) + ")");
+        }
+        conn.commit();
+        conn.setAutoCommit(true);
+        int upsertCount = conn.createStatement().executeUpdate("UPSERT INTO T2 SELECT pk,
val FROM SALTEDT1");
+        assertEquals(100,upsertCount);
+        conn.close();
+    }
+
     private static Connection getConnection(long ts) throws SQLException {
         Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
         props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c5c6fdcb/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 28ba53d..fe1f16c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -36,6 +36,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.hbase10.TransactionAwareHTable;
+import co.cask.tephra.hbase10.coprocessor.TransactionProcessor;
+
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
@@ -68,16 +74,10 @@ import org.junit.Test;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.hbase10.TransactionAwareHTable;
-import co.cask.tephra.hbase10.coprocessor.TransactionProcessor;
-
 public class TransactionIT extends BaseHBaseManagedTimeIT {
-	
-	private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR
+ TRANSACTIONAL_DATA_TABLE;
-	
+    
+    private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR
+ TRANSACTIONAL_DATA_TABLE;
+    
     @Before
     public void setUp() throws SQLException {
         ensureTableCreated(getUrl(), TRANSACTIONAL_DATA_TABLE);
@@ -90,73 +90,73 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
         props.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.toString(true));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
-		
-	@Test
-	public void testReadOwnWrites() throws Exception {
-		String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
-		try (Connection conn = DriverManager.getConnection(getUrl())) {
-			conn.setAutoCommit(false);
-			ResultSet rs = conn.createStatement().executeQuery(selectSql);
-	     	assertFalse(rs.next());
-	     	
-	        String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk,
long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
-	        PreparedStatement stmt = conn.prepareStatement(upsert);
-			// upsert two rows
-			TestUtil.setRowKeyColumns(stmt, 1);
-			stmt.execute();
-			TestUtil.setRowKeyColumns(stmt, 2);
-			stmt.execute();
-	        
-	        // verify rows can be read even though commit has not been called
-			rs = conn.createStatement().executeQuery(selectSql);
-			TestUtil.validateRowKeyColumns(rs, 1);
-			TestUtil.validateRowKeyColumns(rs, 2);
-	        assertFalse(rs.next());
-	        
-	        conn.commit();
-	        
-	        // verify rows can be read after commit
-	        rs = conn.createStatement().executeQuery(selectSql);
-	        TestUtil.validateRowKeyColumns(rs, 1);
-	        TestUtil.validateRowKeyColumns(rs, 2);
-	        assertFalse(rs.next());
-		}
-	}
-	
-	@Test
-	public void testTxnClosedCorrecty() throws Exception {
-		String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
-		try (Connection conn = DriverManager.getConnection(getUrl())) {
-			conn.setAutoCommit(false);
-			ResultSet rs = conn.createStatement().executeQuery(selectSql);
-	     	assertFalse(rs.next());
-	     	
-	        String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk,
long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
-	        PreparedStatement stmt = conn.prepareStatement(upsert);
-			// upsert two rows
-			TestUtil.setRowKeyColumns(stmt, 1);
-			stmt.execute();
-			TestUtil.setRowKeyColumns(stmt, 2);
-			stmt.execute();
-	        
-	        // verify rows can be read even though commit has not been called
-			rs = conn.createStatement().executeQuery(selectSql);
-			TestUtil.validateRowKeyColumns(rs, 1);
-			TestUtil.validateRowKeyColumns(rs, 2);
-	        assertFalse(rs.next());
-	        
-	        conn.close();
-	        // wait for any open txns to time out
-	        Thread.sleep(DEFAULT_TXN_TIMEOUT_SECONDS*1000+10000);
-	        assertTrue("There should be no invalid transactions", txManager.getInvalidSize()==0);
-		}
-	}
-	
+        
+    @Test
+    public void testReadOwnWrites() throws Exception {
+        String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(false);
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            assertFalse(rs.next());
+            
+            String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk,
long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            // upsert two rows
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.execute();
+            TestUtil.setRowKeyColumns(stmt, 2);
+            stmt.execute();
+            
+            // verify rows can be read even though commit has not been called
+            rs = conn.createStatement().executeQuery(selectSql);
+            TestUtil.validateRowKeyColumns(rs, 1);
+            TestUtil.validateRowKeyColumns(rs, 2);
+            assertFalse(rs.next());
+            
+            conn.commit();
+            
+            // verify rows can be read after commit
+            rs = conn.createStatement().executeQuery(selectSql);
+            TestUtil.validateRowKeyColumns(rs, 1);
+            TestUtil.validateRowKeyColumns(rs, 2);
+            assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testTxnClosedCorrecty() throws Exception {
+        String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(false);
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            assertFalse(rs.next());
+            
+            String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk,
long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            // upsert two rows
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.execute();
+            TestUtil.setRowKeyColumns(stmt, 2);
+            stmt.execute();
+            
+            // verify rows can be read even though commit has not been called
+            rs = conn.createStatement().executeQuery(selectSql);
+            TestUtil.validateRowKeyColumns(rs, 1);
+            TestUtil.validateRowKeyColumns(rs, 2);
+            assertFalse(rs.next());
+            
+            conn.close();
+            // wait for any open txns to time out
+            Thread.sleep(DEFAULT_TXN_TIMEOUT_SECONDS*1000+10000);
+            assertTrue("There should be no invalid transactions", txManager.getInvalidSize()==0);
+        }
+    }
+    
     @Test
     public void testDelete() throws Exception {
         String selectSQL = "SELECT * FROM " + FULL_TABLE_NAME;
         try (Connection conn1 = DriverManager.getConnection(getUrl()); 
-        		Connection conn2 = DriverManager.getConnection(getUrl())) {
+                Connection conn2 = DriverManager.getConnection(getUrl())) {
             conn1.setAutoCommit(false);
             ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
             assertFalse(rs.next());
@@ -188,19 +188,19 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
         }
     }
     
-	@Test
-	public void testAutoCommitQuerySingleTable() throws Exception {
-		try (Connection conn = DriverManager.getConnection(getUrl())) {
-			conn.setAutoCommit(true);
-			// verify no rows returned
-			ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME);
-			assertFalse(rs.next());
-		}
-	}
-	
+    @Test
+    public void testAutoCommitQuerySingleTable() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            // verify no rows returned
+            ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME);
+            assertFalse(rs.next());
+        }
+    }
+    
     @Test
     public void testAutoCommitQueryMultiTables() throws Exception {
-    	try (Connection conn = DriverManager.getConnection(getUrl())) {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.setAutoCommit(true);
             // verify no rows returned
             ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME
+ " a JOIN " + FULL_TABLE_NAME + " b ON (a.long_pk = b.int_pk)");
@@ -208,87 +208,87 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
         } 
     }
     
-	@Test
-	public void testColConflicts() throws Exception {
-		try (Connection conn1 = DriverManager.getConnection(getUrl()); 
-        		Connection conn2 = DriverManager.getConnection(getUrl())) {
-			conn1.setAutoCommit(false);
-			conn2.setAutoCommit(false);
-			String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
-			conn1.setAutoCommit(false);
-			ResultSet rs = conn1.createStatement().executeQuery(selectSql);
-	     	assertFalse(rs.next());
-			// upsert row using conn1
-			String upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk,
decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)";
-			PreparedStatement stmt = conn1.prepareStatement(upsertSql);
-			TestUtil.setRowKeyColumns(stmt, 1);
-			stmt.setInt(7, 10);
-	        stmt.execute();
-	        // upsert row using conn2
- 			stmt = conn2.prepareStatement(upsertSql);
- 			TestUtil.setRowKeyColumns(stmt, 1);
-			stmt.setInt(7, 11);
-	        stmt.execute();
- 	        
- 	        conn1.commit();
-	        //second commit should fail
- 	        try {
- 	 	        conn2.commit();
- 	 	        fail();
- 	        }	
- 	        catch (SQLException e) {
- 	        	assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode());
- 	        }
-		}
-	}
-	
-	private void testRowConflicts() throws Exception {
-		try (Connection conn1 = DriverManager.getConnection(getUrl()); 
-        		Connection conn2 = DriverManager.getConnection(getUrl())) {
-			conn1.setAutoCommit(false);
-			conn2.setAutoCommit(false);
-			String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
-			conn1.setAutoCommit(false);
-			ResultSet rs = conn1.createStatement().executeQuery(selectSql);
-			boolean immutableRows = conn1.unwrap(PhoenixConnection.class).getTable(new PTableKey(null,
FULL_TABLE_NAME)).isImmutableRows();
-	     	assertFalse(rs.next());
-			// upsert row using conn1
-			String upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk,
decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)";
-			PreparedStatement stmt = conn1.prepareStatement(upsertSql);
-			TestUtil.setRowKeyColumns(stmt, 1);
-			stmt.setInt(7, 10);
-	        stmt.execute();
-	        // upsert row using conn2
-	        upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk,
decimal_pk, date_pk, b.int_col2) VALUES(?, ?, ?, ?, ?, ?, ?)";
- 			stmt = conn2.prepareStatement(upsertSql);
- 			TestUtil.setRowKeyColumns(stmt, 1);
-			stmt.setInt(7, 11);
- 	        stmt.execute();
- 	        
- 	        conn1.commit();
-	        //second commit should fail
- 	        try {
- 	 	        conn2.commit();
- 	 	        if (!immutableRows) fail();
- 	        }	
- 	        catch (SQLException e) {
- 	        	if (immutableRows) fail();
- 	        	assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode());
- 	        }
-		}
-	}
-	
-	@Test
-	public void testRowConflictDetected() throws Exception {
-		testRowConflicts();
-	}
-	
-	@Test
-	public void testNoConflictDetectionForImmutableRows() throws Exception {
-		Connection conn = DriverManager.getConnection(getUrl());
-		conn.createStatement().execute("ALTER TABLE " + FULL_TABLE_NAME + " SET IMMUTABLE_ROWS=true");
-		testRowConflicts();
-	}
+    @Test
+    public void testColConflicts() throws Exception {
+        try (Connection conn1 = DriverManager.getConnection(getUrl()); 
+                Connection conn2 = DriverManager.getConnection(getUrl())) {
+            conn1.setAutoCommit(false);
+            conn2.setAutoCommit(false);
+            String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
+            conn1.setAutoCommit(false);
+            ResultSet rs = conn1.createStatement().executeQuery(selectSql);
+            assertFalse(rs.next());
+            // upsert row using conn1
+            String upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk,
int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn1.prepareStatement(upsertSql);
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.setInt(7, 10);
+            stmt.execute();
+            // upsert row using conn2
+            stmt = conn2.prepareStatement(upsertSql);
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.setInt(7, 11);
+            stmt.execute();
+            
+            conn1.commit();
+            //second commit should fail
+            try {
+                conn2.commit();
+                fail();
+            }   
+            catch (SQLException e) {
+                assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode());
+            }
+        }
+    }
+    
+    private void testRowConflicts() throws Exception {
+        try (Connection conn1 = DriverManager.getConnection(getUrl()); 
+                Connection conn2 = DriverManager.getConnection(getUrl())) {
+            conn1.setAutoCommit(false);
+            conn2.setAutoCommit(false);
+            String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
+            conn1.setAutoCommit(false);
+            ResultSet rs = conn1.createStatement().executeQuery(selectSql);
+            boolean immutableRows = conn1.unwrap(PhoenixConnection.class).getTable(new PTableKey(null,
FULL_TABLE_NAME)).isImmutableRows();
+            assertFalse(rs.next());
+            // upsert row using conn1
+            String upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk,
int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn1.prepareStatement(upsertSql);
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.setInt(7, 10);
+            stmt.execute();
+            // upsert row using conn2
+            upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk,
long_pk, decimal_pk, date_pk, b.int_col2) VALUES(?, ?, ?, ?, ?, ?, ?)";
+            stmt = conn2.prepareStatement(upsertSql);
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.setInt(7, 11);
+            stmt.execute();
+            
+            conn1.commit();
+            //second commit should fail
+            try {
+                conn2.commit();
+                if (!immutableRows) fail();
+            }   
+            catch (SQLException e) {
+                if (immutableRows) fail();
+                assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode());
+            }
+        }
+    }
+    
+    @Test
+    public void testRowConflictDetected() throws Exception {
+        testRowConflicts();
+    }
+    
+    @Test
+    public void testNoConflictDetectionForImmutableRows() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("ALTER TABLE " + FULL_TABLE_NAME + " SET IMMUTABLE_ROWS=true");
+        testRowConflicts();
+    }
     
     @Test
     public void testNonTxToTxTable() throws Exception {
@@ -514,33 +514,33 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
     }
 
     public void testCurrentDate() throws Exception {
-		String selectSql = "SELECT current_date() FROM "+FULL_TABLE_NAME;
-		try (Connection conn = DriverManager.getConnection(getUrl())) {
-			conn.setAutoCommit(false);
-			ResultSet rs = conn.createStatement().executeQuery(selectSql);
-	     	assertFalse(rs.next());
-	     	
-	        String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk,
long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
-	        PreparedStatement stmt = conn.prepareStatement(upsert);
-			// upsert two rows
-			TestUtil.setRowKeyColumns(stmt, 1);
-			stmt.execute();
-			conn.commit();
-			
-			rs = conn.createStatement().executeQuery(selectSql);
-			assertTrue(rs.next());
-			Date date1 = rs.getDate(1);
-	     	assertFalse(rs.next());
-	     	
-	     	Thread.sleep(1000);
-	     	
-	     	rs = conn.createStatement().executeQuery(selectSql);
-			assertTrue(rs.next());
-			Date date2 = rs.getDate(1);
-	     	assertFalse(rs.next());
-	     	assertTrue("current_date() should change while executing multiple statements", date2.getTime()
> date1.getTime());
-		}
-	}
+        String selectSql = "SELECT current_date() FROM "+FULL_TABLE_NAME;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(false);
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            assertFalse(rs.next());
+            
+            String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk,
long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            // upsert two rows
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.execute();
+            conn.commit();
+            
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            Date date1 = rs.getDate(1);
+            assertFalse(rs.next());
+            
+            Thread.sleep(1000);
+            
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            Date date2 = rs.getDate(1);
+            assertFalse(rs.next());
+            assertTrue("current_date() should change while executing multiple statements",
date2.getTime() > date1.getTime());
+        }
+    }
     
     @Test
     public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws SQLException
{
@@ -558,32 +558,32 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
     
     @Test
     public void testRowTimestampDisabled() throws SQLException {
-    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-	        conn.setAutoCommit(false);
-	        Statement stmt = conn.createStatement();
-	        try {
-	        	stmt.execute("CREATE TABLE DEMO(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT
PK PRIMARY KEY(k,d ROW_TIMESTAMP)) TRANSACTIONAL=true");
-	        	fail();
-	        }
-        	catch(SQLException e) {
-        		assertEquals(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP.getErrorCode(),
e.getErrorCode());
-        	}
-	        stmt.execute("CREATE TABLE DEMO(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT
PK PRIMARY KEY(k,d ROW_TIMESTAMP))");
-	        try {
-	        	stmt.execute("ALTER TABLE DEMO SET TRANSACTIONAL=true");
-	        	fail();
-	        }
-        	catch(SQLException e) {
-        		assertEquals(SQLExceptionCode.CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP.getErrorCode(),
e.getErrorCode());
-        	}
+            conn.setAutoCommit(false);
+            Statement stmt = conn.createStatement();
+            try {
+                stmt.execute("CREATE TABLE DEMO(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT
PK PRIMARY KEY(k,d ROW_TIMESTAMP)) TRANSACTIONAL=true");
+                fail();
+            }
+            catch(SQLException e) {
+                assertEquals(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP.getErrorCode(),
e.getErrorCode());
+            }
+            stmt.execute("CREATE TABLE DEMO(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT
PK PRIMARY KEY(k,d ROW_TIMESTAMP))");
+            try {
+                stmt.execute("ALTER TABLE DEMO SET TRANSACTIONAL=true");
+                fail();
+            }
+            catch(SQLException e) {
+                assertEquals(SQLExceptionCode.CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP.getErrorCode(),
e.getErrorCode());
+            }
         }
     }
     
     @Test
     public void testReadOnlyView() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
-		String ddl = "CREATE TABLE t (k INTEGER NOT NULL PRIMARY KEY, v1 DATE) TRANSACTIONAL=true";
+        String ddl = "CREATE TABLE t (k INTEGER NOT NULL PRIMARY KEY, v1 DATE) TRANSACTIONAL=true";
         conn.createStatement().execute(ddl);
         ddl = "CREATE VIEW v (v2 VARCHAR) AS SELECT * FROM t where k>4";
         conn.createStatement().execute(ddl);
@@ -870,4 +870,26 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
             assertFalse(rs.next());
         }
     }
+
+    @Test
+    public void testParallelUpsertSelect() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        conn.createStatement().execute("CREATE SEQUENCE S1");
+        conn.createStatement().execute("CREATE TABLE SALTEDT1 (pk INTEGER PRIMARY KEY, val
INTEGER) SALT_BUCKETS=4,TRANSACTIONAL=true");
+        conn.createStatement().execute("CREATE TABLE T2 (pk INTEGER PRIMARY KEY, val INTEGER)
TRANSACTIONAL=true");
+
+        for (int i = 0; i < 100; i++) {
+            conn.createStatement().execute("UPSERT INTO SALTEDT1 VALUES (NEXT VALUE FOR S1,
" + (i%10) + ")");
+        }
+        conn.commit();
+        conn.setAutoCommit(true);
+        int upsertCount = conn.createStatement().executeUpdate("UPSERT INTO T2 SELECT pk,
val FROM SALTEDT1");
+        assertEquals(100,upsertCount);
+        conn.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c5c6fdcb/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 924ed43..4c41f82 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -395,7 +395,6 @@ public class DeleteCompiler {
             break;
         }
         final QueryPlan dataPlan = dataPlanToBe;
-        final ColumnResolver resolver = resolverToBe;
         final boolean hasImmutableIndexes = !immutableIndex.isEmpty();
         // tableRefs is parallel with queryPlans
         TableRef[] tableRefs = new TableRef[hasImmutableIndexes ? immutableIndex.size() :
1];

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c5c6fdcb/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 38067c1..27de68a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -37,6 +37,18 @@ import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.Immutable;
 
+import co.cask.tephra.Transaction;
+import co.cask.tephra.Transaction.VisibilityLevel;
+import co.cask.tephra.TransactionAware;
+import co.cask.tephra.TransactionCodec;
+import co.cask.tephra.TransactionConflictException;
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionFailureException;
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.hbase10.TransactionAwareHTable;
+import co.cask.tephra.visibility.FenceWait;
+import co.cask.tephra.visibility.VisibilityFence;
+
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -98,18 +110,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import co.cask.tephra.Transaction;
-import co.cask.tephra.Transaction.VisibilityLevel;
-import co.cask.tephra.TransactionAware;
-import co.cask.tephra.TransactionCodec;
-import co.cask.tephra.TransactionConflictException;
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.hbase10.TransactionAwareHTable;
-import co.cask.tephra.visibility.FenceWait;
-import co.cask.tephra.visibility.VisibilityFence;
-
 /**
  * 
  * Tracks the uncommitted state
@@ -242,7 +242,18 @@ public class MutationState implements SQLCloseable {
         }
     }
     
-    private void addReadFence(PTable dataTable) throws SQLException {
+    /**
+     * Add an entry to the change set representing the DML operation that is starting.
+     * These entries will not conflict with each other, but they will conflict with a
+     * DDL operation of creating an index. See {@link #addReadFence(PTable)} and TEPHRA-157
+     * for more information.
+     * @param dataTable the table which is doing DML
+     * @throws SQLException
+     */
+    public void addReadFence(PTable dataTable) throws SQLException {
+        if (this.txContext == null) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException();
+        }
         byte[] logicalKey = SchemaUtil.getTableKey(dataTable);
         this.txContext.addTransactionAware(VisibilityFence.create(logicalKey));
         byte[] physicalKey = dataTable.getPhysicalName().getBytes();
@@ -848,8 +859,7 @@ public class MutationState implements SQLCloseable {
 	            final PTable table = tableRef.getTable();
 	            // Track tables to which we've sent uncommitted data
 	            if (isTransactional = table.isTransactional()) {
-	                addReadFence(table);
-                    txTableRefs.add(tableRef);
+	                txTableRefs.add(tableRef);
 	                uncommittedPhysicalNames.add(table.getPhysicalName().getString());
 	            }
 	            boolean isDataTable = true;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c5c6fdcb/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 7e8969b..6bb5722 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -331,6 +331,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                                 MutationPlan plan = stmt.compilePlan(PhoenixStatement.this,
Sequence.ValueOp.VALIDATE_SEQUENCE);
                                 if (plan.getTargetRef() != null && plan.getTargetRef().getTable()
!= null && plan.getTargetRef().getTable().isTransactional()) {
                                     state.startTransaction();
+                                    state.addReadFence(plan.getTargetRef().getTable());
                                 }
                                 Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator();
                                 state.sendUncommitted(tableRefs);


Mime
View raw message