phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [3/4] phoenix git commit: PHOENIX-4731 Make running transactional unit tests for a given provider optional
Date Fri, 05 Oct 2018 16:19:32 GMT
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java
index 523e0d0..83f1af5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java
@@ -35,7 +35,6 @@ import java.util.Properties;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
-import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -49,14 +48,19 @@ import org.junit.runners.Parameterized.Parameters;
 public class MutableRollbackIT extends ParallelStatsDisabledIT {
 	
 	private final boolean localIndex;
+    private final String tableDDLOptions;
 
-	public MutableRollbackIT(boolean localIndex) {
+	public MutableRollbackIT(boolean localIndex, String transactionProvider) {
 		this.localIndex = localIndex;
+        this.tableDDLOptions = " TRANSACTION_PROVIDER='" + transactionProvider + "'";
 	}
 	
-	@Parameters(name="MutableRollbackIT_localIndex={0}") // name is used by failsafe as file name in reports
-    public static Collection<Boolean> data() {
-        return Arrays.asList(new Boolean[] { false, true});
+	@Parameters(name="MutableRollbackIT_localIndex={0},transactionProvider={1}") // name is used by failsafe as file name in reports
+    public static Collection<Object[]> data() {
+        return TestUtil.filterTxParamData(Arrays.asList(new Object[][] {     
+            { false, "TEPHRA"}, { true, "TEPHRA"},
+            { false, "OMID"}, 
+            }),1);
     }
 	
 	private static Connection getConnection() throws SQLException {
@@ -77,8 +81,8 @@ public class MutableRollbackIT extends ParallelStatsDisabledIT {
         conn.setAutoCommit(false);
         try {
             Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-            stmt.execute("CREATE TABLE " + fullTableName2 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true");
+            stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
+            stmt.execute("CREATE TABLE " + fullTableName2 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true,"+tableDDLOptions);
             stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName1 + " ON " + fullTableName1 + " (v1) INCLUDE(v2)");
             stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName2 + " ON " + fullTableName2 + " (v1) INCLUDE(v2)");
             
@@ -210,8 +214,8 @@ public class MutableRollbackIT extends ParallelStatsDisabledIT {
         conn.setAutoCommit(false);
         try {
             Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-            stmt.execute("CREATE TABLE " + fullTableName2 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true");
+            stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+tableDDLOptions);
+            stmt.execute("CREATE TABLE " + fullTableName2 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true,"+tableDDLOptions);
             stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName1 + " ON " + fullTableName1 + " (v1, k)");
             stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName2 + " ON " + fullTableName2 + " (v1, k)");
             
@@ -294,11 +298,6 @@ public class MutableRollbackIT extends ParallelStatsDisabledIT {
             
             conn.rollback();
             assertDataAndIndexRows(stmt, fullTableName1, fullTableName2, indexName1);
-            PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
-            if(localIndex) {
-                dropTable(phoenixConn.getQueryServices().getAdmin(), conn, fullTableName1);
-                dropTable(phoenixConn.getQueryServices().getAdmin(), conn, fullTableName2);
-            }
         } finally {
             conn.close();
         }
@@ -347,7 +346,7 @@ public class MutableRollbackIT extends ParallelStatsDisabledIT {
         conn.setAutoCommit(false);
         try {
             Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+tableDDLOptions);
             stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName1 + " ON " + fullTableName1 + " (v1, k)");
             
             stmt.executeUpdate("upsert into " + fullTableName1 + " values('x', 'yyyy', 'a')");
@@ -435,8 +434,6 @@ public class MutableRollbackIT extends ParallelStatsDisabledIT {
             assertEquals("x", rs.getString(1));
             assertEquals("yyyy", rs.getString(2));
             assertFalse(rs.next());
-            PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
-            if(localIndex) dropTable(phoenixConn.getQueryServices().getAdmin(), conn, fullTableName1);
         } finally {
             conn.close();
         }
@@ -451,7 +448,7 @@ public class MutableRollbackIT extends ParallelStatsDisabledIT {
         conn.setAutoCommit(false);
         try {
             Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+tableDDLOptions);
             stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName1 + " ON " + fullTableName1 + " (v1)");
             stmt.executeUpdate("upsert into " + fullTableName1 + " values('x', 'a', 'a')");
             conn.commit();
@@ -501,8 +498,6 @@ public class MutableRollbackIT extends ParallelStatsDisabledIT {
             assertEquals("x", rs.getString(1));
             assertEquals("a", rs.getString(2));
             assertFalse(rs.next());
-            PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
-            if(localIndex) dropTable(phoenixConn.getQueryServices().getAdmin(), conn, fullTableName1);
         } finally {
             conn.close();
         }
@@ -515,5 +510,4 @@ public class MutableRollbackIT extends ParallelStatsDisabledIT {
             admin.deleteTable(TableName.valueOf(tableName));
         } 
     }
-
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java
index 8cafcfc..ff9f14b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java
@@ -45,11 +45,16 @@ import org.junit.runners.Parameterized.Parameters;
 public class RollbackIT extends ParallelStatsDisabledIT {
 	
 	private final boolean localIndex;
-	private final boolean mutable;
+    private final String tableDDLOptions;
 
-	public RollbackIT(boolean localIndex, boolean mutable) {
+	public RollbackIT(boolean localIndex, boolean mutable, String transactionProvider) {
 		this.localIndex = localIndex;
-		this.mutable = mutable;
+        StringBuilder optionBuilder = new StringBuilder();
+        optionBuilder.append(" TRANSACTION_PROVIDER='" + transactionProvider + "'");
+        if (!mutable) {
+            optionBuilder.append(",IMMUTABLE_ROWS=true");
+        }
+        this.tableDDLOptions = optionBuilder.toString();
 	}
 	
     private static Connection getConnection() throws SQLException {
@@ -59,12 +64,13 @@ public class RollbackIT extends ParallelStatsDisabledIT {
         return conn;
     }
     
-	@Parameters(name="RollbackIT_localIndex={0},mutable={1}") // name is used by failsafe as file name in reports
-    public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] {     
-                 { false, false }, { false, true },
-                 { true, false }, { true, true } 
-           });
+	@Parameters(name="RollbackIT_localIndex={0},mutable={1},transactionProvider={2}") // name is used by failsafe as file name in reports
+    public static Collection<Object[]> data() {
+        return TestUtil.filterTxParamData(Arrays.asList(new Object[][] {     
+                 { false, false, "TEPHRA" }, { false, true, "TEPHRA"  },
+                 { true, false, "TEPHRA"  }, { true, true, "TEPHRA"  },
+                 { false, false, "OMID" }, { false, true, "OMID"  },
+           }),2);
     }
     
     @Test
@@ -76,7 +82,7 @@ public class RollbackIT extends ParallelStatsDisabledIT {
         conn.setAutoCommit(false);
         try {
             Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+            stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+tableDDLOptions);
             stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE(v2)");
             
             stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'y', 'a')");
@@ -120,7 +126,7 @@ public class RollbackIT extends ParallelStatsDisabledIT {
         String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
         try {
             Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR, v1 VARCHAR, v2 VARCHAR, CONSTRAINT pk PRIMARY KEY (v1, v2))"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+            stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR, v1 VARCHAR, v2 VARCHAR, CONSTRAINT pk PRIMARY KEY (v1, v2))"+tableDDLOptions);
             stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + "(v1, k)");
             
             stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'y', 'a')");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
index d6b68d4..f94ec81 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
@@ -46,6 +46,7 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -66,11 +67,16 @@ public class TxWriteFailureIT extends BaseUniqueNamesOwnClusterIT {
     private static final String ROW_TO_FAIL = "fail";
     
     private final boolean localIndex;
-	private final boolean mutable;
+    private final String tableDDLOptions;
 
-	public TxWriteFailureIT(boolean localIndex, boolean mutable) {
+	public TxWriteFailureIT(boolean localIndex, boolean mutable, String transactionProvider) {
 		this.localIndex = localIndex;
-		this.mutable = mutable;
+        StringBuilder optionBuilder = new StringBuilder();
+        optionBuilder.append(" TRANSACTION_PROVIDER='" + transactionProvider + "'");
+        if (!mutable) {
+            optionBuilder.append(",IMMUTABLE_ROWS=true");
+        }
+        this.tableDDLOptions = optionBuilder.toString();
 	}
 	
 	@BeforeClass
@@ -85,11 +91,12 @@ public class TxWriteFailureIT extends BaseUniqueNamesOwnClusterIT {
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
 	
-	@Parameters(name="TxWriteFailureIT_localIndex={0},mutable={1}") // name is used by failsafe as file name in reports
-    public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] {
-                 { false, false }, { false, true }, { true, false }, { true, true }
-           });
+	@Parameters(name="TxWriteFailureIT_localIndex={0},mutable={1},transactionProvider={2}") // name is used by failsafe as file name in reports
+    public static Collection<Object[]> data() {
+        return TestUtil.filterTxParamData(Arrays.asList(new Object[][] {
+                 { false, false, "TEPHRA" }, { false, true, "TEPHRA" }, { true, false, "TEPHRA" }, { true, true, "TEPHRA" },
+                 { false, false, "OMID" }, { false, true, "OMID" }, 
+           }), 2);
     }
     
     @Before
@@ -120,7 +127,7 @@ public class TxWriteFailureIT extends BaseUniqueNamesOwnClusterIT {
         Connection conn = driver.connect(url, props);
         conn.setAutoCommit(false);
         conn.createStatement().execute(
-                "CREATE TABLE " + dataTableFullName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+                "CREATE TABLE " + dataTableFullName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR)"+tableDDLOptions);
         conn.createStatement().execute(
                 "CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + dataTableFullName + " (v1)");
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index 235f20f..2b0c8b9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -18,7 +18,6 @@
 package org.apache.phoenix.execute;
 
 import static com.google.common.collect.Lists.newArrayList;
-import static com.google.common.collect.Sets.newHashSet;
 import static java.util.Collections.singletonList;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_FAILED_COUNT;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_FAILED_SIZE;
@@ -37,11 +36,9 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -51,7 +48,8 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.phoenix.end2end.BaseOwnClusterIT;
+
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -62,9 +60,7 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SchemaUtil;
-import org.junit.After;
-import org.junit.AfterClass;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -73,27 +69,23 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 @RunWith(Parameterized.class)
-// Needs to extend BaseOwnClusterIT due to installation of FailingRegionObserver coprocessor
-public class PartialCommitIT extends BaseOwnClusterIT {
+// Needs to extend BaseUniqueNamesOwnClusterIT due to installation of FailingRegionObserver coprocessor
+public class PartialCommitIT extends BaseUniqueNamesOwnClusterIT {
     
-	private final String A_SUCESS_TABLE;
-	private final String B_FAILURE_TABLE;
-	private final String C_SUCESS_TABLE;
-    private final String UPSERT_TO_FAIL;
-    private final String UPSERT_SELECT_TO_FAIL;
-    private final String DELETE_TO_FAIL;
+	private final String aSuccessTable;
+	private final String bFailureTable;
+	private final String cSuccessTable;
+    private final String upsertToFail;
+    private final String upsertSelectToFail;
+    private final String deleteToFail;
     private static final String TABLE_NAME_TO_FAIL = "B_FAILURE_TABLE";
     private static final byte[] ROW_TO_FAIL_UPSERT_BYTES = Bytes.toBytes("fail me upsert");
     private static final byte[] ROW_TO_FAIL_DELETE_BYTES = Bytes.toBytes("fail me delete");
-    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
     
-    @Override
-    @After
-    public void cleanUpAfterTest() throws Exception {}
-
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(3);
@@ -104,144 +96,135 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
         clientProps.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
-        createTablesWithABitOfData();
-    }
-    
-    @Parameters(name="PartialCommitIT_transactional={0}") // name is used by failsafe as file name in reports
-    public static Collection<Boolean> data() {
-        return Arrays.asList(false, true);
     }
     
     private final boolean transactional;
+    private final String transactionProvider;
     
-    public PartialCommitIT(boolean transactional) {
-        this.transactional = transactional;
-		if (transactional) {
-			A_SUCESS_TABLE = "A_SUCCESS_TABLE_TXN";
-			B_FAILURE_TABLE = TABLE_NAME_TO_FAIL+"_TXN";
-			C_SUCESS_TABLE = "C_SUCCESS_TABLE_TXN";
-		}
-		else {
-			A_SUCESS_TABLE = "A_SUCCESS_TABLE";
-			B_FAILURE_TABLE = TABLE_NAME_TO_FAIL;
-			C_SUCESS_TABLE = "C_SUCCESS_TABLE";
-		}
-	    UPSERT_TO_FAIL = "upsert into " + B_FAILURE_TABLE + " values ('" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "', 'boom!')";
-	    UPSERT_SELECT_TO_FAIL = "upsert into " + B_FAILURE_TABLE + " select k, c from a_success_table";
-	    DELETE_TO_FAIL = "delete from " + B_FAILURE_TABLE + "  where k='" + Bytes.toString(ROW_TO_FAIL_DELETE_BYTES) + "'";
+    @Parameters(name="PartialCommitIT_transactionProvider={0}")
+    public static Collection<Object[]> data() {
+        return TestUtil.filterTxParamData(Arrays.asList(new Object[][] { 
+                 {"TEPHRA"},{"OMID"}}),0);
+    }
+    
+    public PartialCommitIT(String transactionProvider) {
+        this.transactionProvider = transactionProvider;
+        this.transactional = transactionProvider != null;
+		aSuccessTable = generateUniqueName();
+		bFailureTable = TABLE_NAME_TO_FAIL + generateUniqueName();
+		cSuccessTable = generateUniqueName();
+	    upsertToFail = "upsert into " + bFailureTable + " values ('" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "', 'boom!')";
+	    upsertSelectToFail = "upsert into " + bFailureTable + " select k, c from " + aSuccessTable;
+	    deleteToFail = "delete from " + bFailureTable + "  where k='" + Bytes.toString(ROW_TO_FAIL_DELETE_BYTES) + "'";
 	}
     
-    private static void createTablesWithABitOfData() throws Exception {
-        try (Connection con = driver.connect(url, new Properties())) {
+    private void createTables() throws Exception {
+        try (Connection con = DriverManager.getConnection(getUrl())) {
             Statement sta = con.createStatement();
-            sta.execute("create table a_success_table (k varchar primary key, c varchar)");
-            sta.execute("create table b_failure_table (k varchar primary key, c varchar)");
-            sta.execute("create table c_success_table (k varchar primary key, c varchar)");
-            sta.execute("create table a_success_table_txn (k varchar primary key, c varchar) TRANSACTIONAL=true");
-            sta.execute("create table b_failure_table_txn (k varchar primary key, c varchar) TRANSACTIONAL=true");
-            sta.execute("create table c_success_table_txn (k varchar primary key, c varchar) TRANSACTIONAL=true");
-            con.commit();
+            sta.execute("create table " + aSuccessTable + " (k varchar primary key, c varchar)" + (transactional ? (" TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + transactionProvider + "'") : ""));
+            sta.execute("create table " + bFailureTable + " (k varchar primary key, c varchar)" + (transactional ? (" TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + transactionProvider + "'") : ""));
+            sta.execute("create table " + cSuccessTable + " (k varchar primary key, c varchar)" + (transactional ? (" TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + transactionProvider + "'") : ""));
         }
-
-        try (Connection con = driver.connect(url, new Properties())) {
+    }
+    
+    private void populateTables() throws Exception {
+        try (Connection con = DriverManager.getConnection(getUrl())) {
             con.setAutoCommit(false);
             Statement sta = con.createStatement();
-            for (String table : newHashSet("a_success_table", "b_failure_table", "c_success_table", 
-            		"a_success_table_txn", "b_failure_table_txn", "c_success_table_txn")) {
-                sta.execute("upsert into " + table + " values ('z', 'z')");
-                sta.execute("upsert into " + table + " values ('zz', 'zz')");
-                sta.execute("upsert into " + table + " values ('zzz', 'zzz')");
+            List<String> tableNames = Lists.newArrayList(aSuccessTable, bFailureTable, cSuccessTable);
+            for (String tableName : tableNames) {
+                sta.execute("upsert into " + tableName + " values ('z', 'z')");
+                sta.execute("upsert into " + tableName + " values ('zz', 'zz')");
+                sta.execute("upsert into " + tableName + " values ('zzz', 'zzz')");
             }
             con.commit();
         }
     }
     
-    @AfterClass
-    public static void teardownCluster() throws Exception {
-      TEST_UTIL.shutdownMiniCluster();
-    }
-    
     @Before
-    public void resetGlobalMetrics() {
+    public void resetGlobalMetrics() throws Exception {
+        createTables();
+        populateTables();
         for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
             m.reset();
         }
     }
     
     @Test
-    public void testNoFailure() {
-        testPartialCommit(singletonList("upsert into " + A_SUCESS_TABLE + " values ('testNoFailure', 'a')"), new int[0], false, singletonList("select count(*) from " + A_SUCESS_TABLE + " where k='testNoFailure'"),
+    public void testNoFailure() throws SQLException {
+        testPartialCommit(singletonList("upsert into " + aSuccessTable + " values ('testNoFailure', 'a')"), new int[0], false, singletonList("select count(*) from " + aSuccessTable + " where k='testNoFailure'"),
                                         singletonList(new Integer(1)));
     }
     
     @Test
-    public void testUpsertFailure() {
-        testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testUpsertFailure1', 'a')", 
-                                       UPSERT_TO_FAIL, 
-                                       "upsert into " + A_SUCESS_TABLE + " values ('testUpsertFailure2', 'b')"), 
+    public void testUpsertFailure() throws SQLException {
+        testPartialCommit(newArrayList("upsert into " + aSuccessTable + " values ('testUpsertFailure1', 'a')", 
+                                       upsertToFail, 
+                                       "upsert into " + aSuccessTable + " values ('testUpsertFailure2', 'b')"), 
                                        transactional ? new int[] {0,1,2} : new int[]{1}, true,
-                                       newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k like 'testUpsertFailure_'",
-                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"),
+                                       newArrayList("select count(*) from " + aSuccessTable + " where k like 'testUpsertFailure_'",
+                                                    "select count(*) from " + bFailureTable + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"),
                                        transactional ? newArrayList(new Integer(0), new Integer(0)) : newArrayList(new Integer(2), new Integer(0)));
     }
     
     @Test
     public void testUpsertSelectFailure() throws SQLException {
-        try (Connection con = driver.connect(url, new Properties())) {
-            con.createStatement().execute("upsert into " + A_SUCESS_TABLE + " values ('" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "', 'boom!')");
+        try (Connection con = DriverManager.getConnection(getUrl())) {
+            con.createStatement().execute("upsert into " + aSuccessTable + " values ('" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "', 'boom!')");
             con.commit();
         }
         
-        testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testUpsertSelectFailure', 'a')", 
-                                       UPSERT_SELECT_TO_FAIL), 
+        testPartialCommit(newArrayList("upsert into " + aSuccessTable + " values ('testUpsertSelectFailure', 'a')", 
+                                       upsertSelectToFail), 
                                        transactional ? new int[] {0,1} : new int[]{1}, true, 
-                                       newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "')",
-                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"),
+                                       newArrayList("select count(*) from " + aSuccessTable + " where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "')",
+                                                    "select count(*) from " + bFailureTable + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"),
                                        transactional ? newArrayList(new Integer(1) /* from commit above */, new Integer(0)) : newArrayList(new Integer(2), new Integer(0)));
     }
     
     @Test
-    public void testDeleteFailure() {
-        testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testDeleteFailure1', 'a')", 
-                                       DELETE_TO_FAIL,
-                                       "upsert into " + A_SUCESS_TABLE + " values ('testDeleteFailure2', 'b')"), 
+    public void testDeleteFailure() throws SQLException {
+        testPartialCommit(newArrayList("upsert into " + aSuccessTable + " values ('testDeleteFailure1', 'a')", 
+                                       deleteToFail,
+                                       "upsert into " + aSuccessTable + " values ('testDeleteFailure2', 'b')"), 
                                        transactional ? new int[] {0,1,2} : new int[]{1}, true, 
-                                       newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k like 'testDeleteFailure_'",
-                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"),
+                                       newArrayList("select count(*) from " + aSuccessTable + " where k like 'testDeleteFailure_'",
+                                                    "select count(*) from " + bFailureTable + " where k = 'z'"),
                                        transactional ? newArrayList(new Integer(0), new Integer(1) /* original row */) : newArrayList(new Integer(2), new Integer(1)));
     }
     
     /**
      * {@link MutationState} keeps mutations ordered lexicographically by table name.
+     * @throws SQLException 
      */
     @Test
-    public void testOrderOfMutationsIsPredicatable() {
-        testPartialCommit(newArrayList("upsert into " + C_SUCESS_TABLE + " values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order
-                                       UPSERT_TO_FAIL, 
-                                       "upsert into " + A_SUCESS_TABLE + " values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order
+    public void testOrderOfMutationsIsPredicatable() throws SQLException {
+        testPartialCommit(newArrayList("upsert into " + cSuccessTable + " values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order
+                                       upsertToFail, 
+                                       "upsert into " + aSuccessTable + " values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order
                                        transactional ? new int[] {0,1,2} : new int[]{0,1}, true, 
-                                       newArrayList("select count(*) from " + C_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'",
-                                                    "select count(*) from " + A_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'",
-                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"),
+                                       newArrayList("select count(*) from " + cSuccessTable + " where k='testOrderOfMutationsIsPredicatable'",
+                                                    "select count(*) from " + aSuccessTable + " where k='testOrderOfMutationsIsPredicatable'",
+                                                    "select count(*) from " + bFailureTable + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"),
                                        transactional ? newArrayList(new Integer(0), new Integer(0), new Integer(0)) : newArrayList(new Integer(0), new Integer(1), new Integer(0)));
     }
     
     @Test
-    public void testStatementOrderMaintainedInConnection() {
-        testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testStatementOrderMaintainedInConnection', 'a')", 
-                                       "upsert into " + A_SUCESS_TABLE + " select k, c from " + C_SUCESS_TABLE,
-                                       DELETE_TO_FAIL,
-                                       "select * from " + A_SUCESS_TABLE + "", 
-                                       UPSERT_TO_FAIL), 
+    public void testStatementOrderMaintainedInConnection() throws SQLException {
+        testPartialCommit(newArrayList("upsert into " + aSuccessTable + " values ('testStatementOrderMaintainedInConnection', 'a')", 
+                                       "upsert into " + aSuccessTable + " select k, c from " + cSuccessTable,
+                                       deleteToFail,
+                                       "select * from " + aSuccessTable + "", 
+                                       upsertToFail), 
                                        transactional ? new int[] {0,1,2,4} : new int[]{2,4}, true, 
-                                       newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k='testStatementOrderMaintainedInConnection' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection
-                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'",
-                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"),
+                                       newArrayList("select count(*) from " + aSuccessTable + " where k='testStatementOrderMaintainedInConnection' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection
+                                                    "select count(*) from " + bFailureTable + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'",
+                                                    "select count(*) from " + bFailureTable + " where k = 'z'"),
                                        transactional ? newArrayList(new Integer(3) /* original rows */, new Integer(0), new Integer(1) /* original row */) : newArrayList(new Integer(4), new Integer(0), new Integer(1)));
     }
     
     private void testPartialCommit(List<String> statements, int[] expectedUncommittedStatementIndexes, boolean willFail, List<String> countStatementsForVerification,
-                                   List<Integer> expectedCountsForVerification) {
+                                   List<Integer> expectedCountsForVerification) throws SQLException {
         Preconditions.checkArgument(countStatementsForVerification.size() == expectedCountsForVerification.size());
         
         try (Connection con = getConnectionWithTableOrderPreservingMutationState()) {
@@ -265,7 +248,7 @@ public class PartialCommitIT extends BaseOwnClusterIT {
                 int[] uncommittedStatementIndexes = ((CommitException)sqle).getUncommittedStatementIndexes();
                 assertArrayEquals(expectedUncommittedStatementIndexes, uncommittedStatementIndexes);
                 Map<String, Map<MetricType, Long>> mutationWriteMetrics = PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(con);
-                assertEquals(expectedUncommittedStatementIndexes.length, mutationWriteMetrics.get(B_FAILURE_TABLE).get(MUTATION_BATCH_FAILED_SIZE).intValue());
+                assertEquals(expectedUncommittedStatementIndexes.length, mutationWriteMetrics.get(bFailureTable).get(MUTATION_BATCH_FAILED_SIZE).intValue());
                 assertEquals(expectedUncommittedStatementIndexes.length, GLOBAL_MUTATION_BATCH_FAILED_COUNT.getMetric().getTotalSum());
             }
             
@@ -279,22 +262,20 @@ public class PartialCommitIT extends BaseOwnClusterIT {
                 }
                 assertEquals(expectedCountsForVerification.get(i).intValue(), rs.getInt(1));
             }
-        } catch (SQLException e) {
-            fail(e.toString());
         }
     }
     
     private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException {
-        Connection con = driver.connect(url, new Properties());
-        PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class));
-        final Map<TableRef, MultiRowMutationState> mutations = Maps.newTreeMap(new TableRefComparator());
-        // passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState
-        return new PhoenixConnection(phxCon, (MutationState)null) {
-            @Override
-            protected MutationState newMutationState(int maxSize, int maxSizeBytes) {
-                return new MutationState(maxSize, maxSizeBytes, this, mutations, false, null);
+        try (PhoenixConnection con = DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class)) {
+            final Map<TableRef, MultiRowMutationState> mutations = Maps.newTreeMap(new TableRefComparator());
+            // passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState
+            return new PhoenixConnection(con, (MutationState)null) {
+                @Override
+                protected MutationState newMutationState(int maxSize, int maxSizeBytes) {
+                    return new MutationState(maxSize, maxSizeBytes, this, mutations, false, null);
+                };
             };
-        };
+        }
     }
 
     public static class FailingRegionObserver extends SimpleRegionObserver {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
index 2211d58..0ddbed3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.rpc;
 
 import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE;
 import static org.junit.Assert.assertFalse;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
@@ -43,6 +42,7 @@ import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
@@ -50,7 +50,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 /**
- * Verifies the number of rpcs calls from {@link MetaDataClient} updateCache() 
+ * Verifies the number of RPC calls from {@link MetaDataClient} updateCache() 
  * for transactional and non-transactional tables.
  */
 public class UpdateCacheIT extends ParallelStatsDisabledIT {
@@ -67,10 +67,15 @@ public class UpdateCacheIT extends ParallelStatsDisabledIT {
     
     @Test
     public void testUpdateCacheForTxnTable() throws Exception {
-        String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE;
-        Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
-        conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + "TRANSACTIONAL=true");
-        helpTestUpdateCache(fullTableName, new int[] {1, 1});
+        for (TransactionFactory.Provider provider : TransactionFactory.Provider.values()) {
+            if (provider.runTests()) {
+                String tableName = generateUniqueName();
+                String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + tableName;
+                Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
+                conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + "TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + provider + "'");
+                helpTestUpdateCache(fullTableName, new int[] {1, 1});
+            }
+        }
     }
     
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java
index 41e39e3..d596328 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java
@@ -87,27 +87,19 @@ public abstract class StatsCollectorIT extends BaseUniqueNamesOwnClusterIT {
     private final boolean mutable;
     private static final int defaultGuidePostWidth = 20;
     
-    protected StatsCollectorIT(boolean mutable, boolean transactional, boolean userTableNamespaceMapped, boolean columnEncoded) {
+    protected StatsCollectorIT(boolean mutable, String transactionProvider, boolean userTableNamespaceMapped, boolean columnEncoded) {
         StringBuilder sb = new StringBuilder();
-        if (transactional) {
-            sb.append("TRANSACTIONAL=true");
-        }
-        if (!columnEncoded) {
-            if (sb.length()>0) {
-                sb.append(",");
-            }
-            sb.append("COLUMN_ENCODED_BYTES=0");
+        if (columnEncoded) {
+            sb.append("COLUMN_ENCODED_BYTES=4");        
         } else {
-            if (sb.length()>0) {
-                sb.append(",");
-            }
-            sb.append("COLUMN_ENCODED_BYTES=4");
+            sb.append("COLUMN_ENCODED_BYTES=0");
+        }
+        
+        if (transactionProvider != null) {
+            sb.append(",TRANSACTIONAL=true, TRANSACTION_PROVIDER='" + transactionProvider + "'");
         }
         if (!mutable) {
-            if (sb.length()>0) {
-                sb.append(",");
-            }
-            sb.append("IMMUTABLE_ROWS=true");
+            sb.append(",IMMUTABLE_ROWS=true");
             if (!columnEncoded) {
                 sb.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
index 70a5713..5802c6d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.tx;
 
 import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.apache.phoenix.util.TestUtil.createTransactionalTable;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -31,6 +30,8 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.client.Get;
@@ -43,10 +44,14 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  * 
@@ -54,7 +59,21 @@ import org.junit.Test;
  * TODO: review with Tephra community
  *
  */
+@RunWith(Parameterized.class)
 public class FlappingTransactionIT extends ParallelStatsDisabledIT {
+    private final String txProvider;
+    
+    public FlappingTransactionIT(String provider) {
+        txProvider = provider;
+    }
+
+    @Parameters(name="FlappingTransactionIT_transactionProvider={0}") // name is used by failsafe as file name in reports
+    public static Collection<Object[]> data() {
+        return TestUtil.filterTxParamData(Arrays.asList(new Object[][] {
+            {"TEPHRA"}, {"OMID"}
+           }),0);
+    }
+    
     @Test
     public void testDelete() throws Exception {
         String transTableName = generateUniqueName();
@@ -63,7 +82,7 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
         try (Connection conn = DriverManager.getConnection(getUrl());
                 Connection conn1 = DriverManager.getConnection(getUrl()); 
                 Connection conn2 = DriverManager.getConnection(getUrl())) {
-            TestUtil.createTransactionalTable(conn, fullTableName);
+            TestUtil.createTransactionalTable(conn, fullTableName, "TRANSACTION_PROVIDER='"+txProvider+"'");
             conn1.setAutoCommit(false);
             ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
             assertFalse(rs.next());
@@ -103,7 +122,7 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
         try (Connection conn = DriverManager.getConnection(getUrl());
                 Connection conn1 = DriverManager.getConnection(getUrl()); 
                 Connection conn2 = DriverManager.getConnection(getUrl())) {
-            createTransactionalTable(conn, fullTableName);
+            TestUtil.createTransactionalTable(conn, fullTableName, "TRANSACTION_PROVIDER='"+txProvider+"'");
             conn1.setAutoCommit(false);
             conn2.setAutoCommit(true);
             ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
@@ -136,8 +155,11 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
             rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName + " WHERE int_col1 = 1");
             assertTrue(rs.next());
             assertEquals(0, rs.getInt(1));
+            // this succeeds rs = conn2.createStatement().executeQuery("SELECT int_col1 FROM " + fullTableName + " WHERE int_col1 = 1");
+            // this fails rs = conn2.createStatement().executeQuery("SELECT * FROM " + fullTableName + " WHERE int_col1 = 1");
             rs = conn2.createStatement().executeQuery("SELECT * FROM " + fullTableName + " WHERE int_col1 = 1");
-            assertFalse(rs.next());
+            boolean hasNext = rs.next();
+            assertFalse(hasNext);
             
             conn1.commit();
             
@@ -158,7 +180,7 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
         try (Connection conn = DriverManager.getConnection(getUrl());
                 Connection conn1 = DriverManager.getConnection(getUrl()); 
                 Connection conn2 = DriverManager.getConnection(getUrl())) {
-            createTransactionalTable(conn, fullTableName);
+            TestUtil.createTransactionalTable(conn, fullTableName, "TRANSACTION_PROVIDER='"+txProvider+"'");
             conn1.setAutoCommit(false);
             conn2.setAutoCommit(true);
             ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
@@ -212,7 +234,7 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
         
         Statement stmt = conn.createStatement();
-        stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true");
+        stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true,TRANSACTION_PROVIDER='"+txProvider+"'");
         Table htable = pconn.getQueryServices().getTable(Bytes.toBytes(fullTableName));
         stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'a', 'a')");
         conn.commit();
@@ -223,12 +245,13 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
             assertEquals(1,rs.getInt(1));
         }
 
-        PhoenixTransactionContext txContext =
-              TransactionFactory.getTransactionProvider(TransactionFactory.Provider.TEPHRA).getTransactionContext(pconn);
-        Table txTable =
-                txContext.getTransactionalTable(htable, false);
-
-        txContext.begin();
+        PhoenixTransactionProvider provider = TransactionFactory.Provider.valueOf(txProvider).getTransactionProvider();
+        PhoenixTransactionContext txContext = provider.getTransactionContext(pconn);
+        // FIXME: must begin txn before getting transactional table
+        // Either set txn on all existing OmidTransactionTable or throw exception
+        // when attempting to get OmidTransactionTable if a txn is not in progress.
+        txContext.begin(); 
+        Table txTable = txContext.getTransactionalTable(htable, false);
 
         // Use HBase APIs to add a new row
         Put put = new Put(Bytes.toBytes("z"));
@@ -275,12 +298,10 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
         }
         
         // Repeat the same as above, but this time abort the transaction
-        txContext =
-              TransactionFactory.getTransactionProvider(TransactionFactory.Provider.TEPHRA).getTransactionContext(pconn);
-        txTable =
-              txContext.getTransactionalTable(htable, false);
-
+        txContext = provider.getTransactionContext(pconn);
         txContext.begin();
+        txTable = txContext.getTransactionalTable(htable, false);
+
         
         // Use HBase APIs to add a new row
         put = new Put(Bytes.toBytes("j"));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
index c02110b..86e6bb3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
@@ -55,6 +56,9 @@ import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.TestUtil;
@@ -70,29 +74,39 @@ import com.google.common.collect.Lists;
 public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
     
     private final String tableDDLOptions;
+    private final String tableDDLOptionsWithoutProvider;
+    private final PhoenixTransactionProvider transactionProvider;
 
-    public ParameterizedTransactionIT(boolean mutable, boolean columnEncoded) {
+    public ParameterizedTransactionIT(Boolean mutable, Boolean columnEncoded, String transactionProvider) {
         StringBuilder optionBuilder = new StringBuilder();
+        optionBuilder.append("TRANSACTION_PROVIDER='"+transactionProvider+"',");
+        this.transactionProvider = TransactionFactory.Provider.valueOf(transactionProvider).getTransactionProvider();
+        StringBuilder optionBuilder2 = new StringBuilder();
         if (!columnEncoded) {
-            optionBuilder.append("COLUMN_ENCODED_BYTES=0");
+            optionBuilder2.append("COLUMN_ENCODED_BYTES=0,");
         }
         if (!mutable) {
-            if (optionBuilder.length() > 0) {
-                optionBuilder.append(",");
-            }
-            optionBuilder.append("IMMUTABLE_ROWS=true");
+            optionBuilder2.append("IMMUTABLE_ROWS=true,");
             if (!columnEncoded) {
-                optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
+                optionBuilder2.append("IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN +",");
             }
         }
+        if (optionBuilder2.length() > 0) {
+            optionBuilder2.setLength(optionBuilder2.length()-1);
+            optionBuilder.append(optionBuilder2);
+        } else {
+            optionBuilder.setLength(optionBuilder.length()-1);
+        }
         this.tableDDLOptions = optionBuilder.toString();
+        this.tableDDLOptionsWithoutProvider = optionBuilder2.toString();
     }
     
-    @Parameters(name="TransactionIT_mutable={0},columnEncoded={1}") // name is used by failsafe as file name in reports
-    public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] {     
-                 {false, false }, {false, true }, {true, false }, { true, true },
-           });
+    @Parameters(name="ParameterizedTransactionIT_mutable={0},columnEncoded={1},transactionProvider={2}") // name is used by failsafe as file name in reports
+    public static Collection<Object[]> data() {
+        return TestUtil.filterTxParamData(Arrays.asList(new Object[][] {     
+                 {false, false, "TEPHRA" }, {false, true, "TEPHRA" }, {true, false, "TEPHRA" }, { true, true, "TEPHRA" },
+                 {false, false, "OMID" }, {true, false, "OMID" },
+           }), 2);
     }
     
     @Test
@@ -136,7 +150,8 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName;
         String selectSql = "SELECT * FROM "+fullTableName;
         try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions + (tableDDLOptions.length() > 0 ? "," : "") + "TRANSACTIONAL=true");
+            String ddl = "create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions + (tableDDLOptions.length() > 0 ? "," : "") + "TRANSACTIONAL=true";
+            conn.createStatement().execute(ddl);
             conn.setAutoCommit(false);
             ResultSet rs = conn.createStatement().executeQuery(selectSql);
             assertFalse(rs.next());
@@ -265,7 +280,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         String nonTxTableName = generateUniqueName();
 
         Connection conn = DriverManager.getConnection(getUrl());
-        conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)" + tableDDLOptions);
+        conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)" + tableDDLOptionsWithoutProvider);
         conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (1)");
         conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (2, 'a')");
         conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (3, 'b')");
@@ -281,7 +296,19 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         }
         htable.put(puts);
         
-        conn.createStatement().execute("ALTER TABLE " + nonTxTableName + " SET TRANSACTIONAL=true");
+        try {
+            conn.createStatement().execute("ALTER TABLE " + nonTxTableName + " SET TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + transactionProvider + "'");
+            if (transactionProvider.isUnsupported(Feature.ALTER_NONTX_TO_TX)) {
+                fail();
+            }
+        } catch (SQLException e) {
+            if (transactionProvider.isUnsupported(Feature.ALTER_NONTX_TO_TX)) {
+                assertEquals(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL.getErrorCode(), e.getErrorCode());
+                return;
+            } else {
+                throw e;
+            }
+        }
         
         htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( nonTxTableName));
         assertTrue(htable.getDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
@@ -330,7 +357,7 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
 
         Connection conn = DriverManager.getConnection(getUrl());
         // Put table in SYSTEM schema to prevent attempts to update the cache after we disable SYSTEM.CATALOG
-        conn.createStatement().execute("CREATE TABLE \"SYSTEM\"." + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)" + tableDDLOptions);
+        conn.createStatement().execute("CREATE TABLE \"SYSTEM\"." + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)" + tableDDLOptionsWithoutProvider);
         conn.createStatement().execute("UPSERT INTO \"SYSTEM\"." + nonTxTableName + " VALUES (1)");
         conn.commit();
         // Reset empty column value to an empty value like it is pre-transactions
@@ -345,10 +372,14 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
             // This will succeed initially in updating the HBase metadata, but then will fail when
             // the SYSTEM.CATALOG table is attempted to be updated, exercising the code to restore
             // the coprocessors back to the non transactional ones.
-            conn.createStatement().execute("ALTER TABLE \"SYSTEM\"." + nonTxTableName + " SET TRANSACTIONAL=true");
+            conn.createStatement().execute("ALTER TABLE \"SYSTEM\"." + nonTxTableName + " SET TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + transactionProvider + "'");
             fail();
         } catch (SQLException e) {
-            assertTrue(e.getMessage().contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " is disabled"));
+            if (transactionProvider.isUnsupported(Feature.ALTER_NONTX_TO_TX)) {
+                assertEquals(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL.getErrorCode(), e.getErrorCode());
+            } else {
+                assertTrue(e.getMessage().contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " is disabled"));
+            }
         } finally {
             admin.enableTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
             admin.close();
@@ -360,7 +391,8 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         assertFalse(rs.next());
         
         htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName));
-        assertFalse(htable.getDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
+        Class<? extends RegionObserver> clazz = transactionProvider.getCoprocessor();
+        assertFalse(htable.getDescriptor().getCoprocessors().contains(clazz.getName()));
         assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices().
                 getTableDescriptor(Bytes.toBytes("SYSTEM." + nonTxTableName)).
                 getColumnFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions());
@@ -378,7 +410,8 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         PTable table = pconn.getTable(new PTableKey(null, t1));
         Table htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1));
         assertTrue(table.isTransactional());
-        assertTrue(htable.getDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
+        Class<? extends RegionObserver> clazz = transactionProvider.getCoprocessor();
+        assertTrue(htable.getDescriptor().getCoprocessors().contains(clazz.getName()));
         
         try {
             ddl = "ALTER TABLE " + t1 + " SET transactional=false";
@@ -389,11 +422,21 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         }
 
         Admin admin = pconn.getQueryServices().getAdmin();
-        
         admin.createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(t2))
                 .addColumnFamily(ColumnFamilyDescriptorBuilder.of(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES)).build());
-        ddl = "CREATE TABLE " + t2 + " (k varchar primary key) transactional=true";
-        conn.createStatement().execute(ddl);
+        try {
+            ddl = "CREATE TABLE " + t2 + " (k varchar primary key) transactional=true,transaction_provider='" + transactionProvider + "'";
+            conn.createStatement().execute(ddl);
+            if (transactionProvider.isUnsupported(Feature.ALTER_NONTX_TO_TX)) {
+                fail();
+            }
+        } catch (SQLException e) {
+            if (transactionProvider.isUnsupported(Feature.ALTER_NONTX_TO_TX)) {
+                assertEquals(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL.getErrorCode(), e.getErrorCode());
+                return;
+            }
+            throw e;
+        }
 
         TableDescriptor tableDescriptor = admin.getDescriptor(TableName.valueOf(t2));
         String str = tableDescriptor.getValue(PhoenixTransactionContext.READ_NON_TX_DATA);
@@ -407,12 +450,12 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         } catch (SQLException e) {
             assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode());
         }
-        ddl += " transactional=true";
+        ddl += " transactional=true,transaction_provider='" + transactionProvider + "'";
         conn.createStatement().execute(ddl);
         table = pconn.getTable(new PTableKey(null, t1));
         htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1));
         assertTrue(table.isTransactional());
-        assertTrue(htable.getDescriptor().getCoprocessors().contains(TephraTransactionalProcessor.class.getName()));
+        assertTrue(htable.getDescriptor().getCoprocessors().contains(clazz.getName()));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 2cf15d2..00a6e2f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -35,13 +35,23 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -52,6 +62,8 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -64,6 +76,8 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
+import com.google.common.collect.Lists;
+
 @RunWith(Parameterized.class)
 public class TransactionIT  extends ParallelStatsDisabledIT {
     private final String txProvider;
@@ -75,12 +89,117 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
     }
 
     @Parameters(name="TransactionIT_provider={0}") // name is used by failsafe as file name in reports
-    public static Collection<String[]> data() {
-        return Arrays.asList(new String[][] {     
-                 {"TEPHRA"/*,"OMID"*/}});
+    public static Collection<Object[]> data() {
+        return TestUtil.filterTxParamData(Arrays.asList(new Object[][] { 
+                 {"TEPHRA"},{"OMID"}}),0);
+    }
+    
+    @Test
+    public void testFailureToRollbackAfterDelete() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String tableName = generateUniqueName();
+            conn.createStatement().execute(
+                    "CREATE TABLE " + tableName + " (k VARCHAR NOT NULL PRIMARY KEY) TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + txProvider + "'");
+            conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a')");
+            conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('b')");
+            conn.commit();
+            // Delete a in new transaction
+            conn.createStatement().execute("DELETE FROM " + tableName + " WHERE k='a'");
+            // Forces data to be written to HBase, but not yet committed
+            conn.createStatement().executeQuery("SELECT * FROM " + tableName).next();
+            // Upsert another row so commit below will fail the write (and fail subsequent attempt t o abort)
+            conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('c')");
+            TestUtil.addCoprocessor(conn, tableName, WriteFailingRegionObserver.class);
+            try {
+                conn.commit();
+                fail();
+            } catch (SQLException e) {
+            }
+            // Delete of a shouldn't be visible since commit failed, so all rows should be present
+            ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("b", rs.getString(1));
+            assertFalse(rs.next());
+        }
+    }
+    
+    public static class WriteFailingRegionObserver extends SimpleRegionObserver {
+        @Override
+        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+            throw new DoNotRetryIOException();
+        }
+    }
+    
+    @Test
+    public void testWithMixOfTxProviders() throws Exception {
+        // No sense in running the test with every providers, so just run it with the default one
+        if (!TransactionFactory.Provider.valueOf(txProvider).equals(TransactionFactory.Provider.getDefault())) {
+            return;
+        }
+        List<String> tableNames = Lists.newArrayList();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            for (TransactionFactory.Provider provider : TransactionFactory.Provider.values()) {
+                if (provider.runTests()) {
+                    String tableName = generateUniqueName();
+                    tableNames.add(tableName);
+                    conn.createStatement().execute(
+                            "CREATE TABLE " + tableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR) TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + provider + "'");
+                }
+            }
+            if (tableNames.size() < 2) {
+                return;
+            }
+            Iterator<String> iterator = tableNames.iterator();
+            String tableName1 = iterator.next();
+            conn.createStatement().execute("UPSERT INTO " + tableName1 + " VALUES('a')");
+            String tableName2 = iterator.next();
+            try {
+                conn.createStatement().execute("UPSERT INTO " + tableName2 + " VALUES('a')");
+                fail();
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.CANNOT_MIX_TXN_PROVIDERS.getErrorCode(), e.getErrorCode());
+            }
+            
+            conn.rollback();
+            conn.setAutoCommit(true);
+            for (String tableName : tableNames) {
+                conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a')");
+            }
+            for (String tableName : tableNames) {
+                ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName);
+                assertTrue(rs.next());
+                assertEquals("a", rs.getString(1));
+                assertFalse(rs.next());
+            }
+        }
     }
     
     @Test
+    public void testPreventLocalIndexCreation() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            for (TransactionFactory.Provider provider : TransactionFactory.Provider.values()) {
+                if (provider.runTests() && provider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.ALLOW_LOCAL_INDEX)) {
+                    String tableName = generateUniqueName();
+                    conn.createStatement().execute(
+                            "CREATE TABLE " + tableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR) TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + provider + "'");
+                    String indexName = generateUniqueName();
+                    try {
+                        conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + "_IDX ON " + tableName + " (v1) INCLUDE(v2)");
+                        fail();
+                    } catch (SQLException e) {
+                        assertEquals(SQLExceptionCode.CANNOT_CREATE_LOCAL_INDEX_FOR_TXN_TABLE.getErrorCode(), e.getErrorCode());
+                    }
+                }
+            }
+        }
+    }
+
+    @Test
     public void testQueryWithSCN() throws Exception {
         String tableName = generateUniqueName();
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -102,7 +221,7 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
     }
 
     @Test
-    public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws SQLException {
+    public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws Exception {
         String tableName = generateUniqueName();
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);
@@ -110,6 +229,10 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
         Statement stmt = conn.createStatement();
         stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
         stmt.execute("DROP TABLE " + tableName);
+        // Must drop metadata as Omid does not allow creating a transactional table from a non transactional one
+        Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+        admin.disableTable(TableName.valueOf(tableName));
+        admin.deleteTable(TableName.valueOf(tableName));
         stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) TRANSACTIONAL=true," + tableDDLOptions);
         stmt.execute("CREATE INDEX " + tableName + "_IDX ON " + tableName + " (v1) INCLUDE(v2)");
         assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName)).isTransactional());
@@ -210,11 +333,11 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
 
         try {
             conn.createStatement().execute("ALTER TABLE " + nonTxTableName + "1 SET TRANSACTIONAL=true," + tableDDLOptions);
-            if (TransactionFactory.Provider.OMID.name().equals(txProvider)) {
-                fail("Omid shouldn't allow converting a non transactional table to be transactional");
+            if (TransactionFactory.Provider.valueOf(txProvider).getTransactionProvider().isUnsupported(Feature.ALTER_NONTX_TO_TX)) {
+                fail(txProvider + " should not allow converting a non transactional table to be transactional");
             }
         } catch (SQLException e) { // Should fail for Omid, but not Tephra
-            if (TransactionFactory.Provider.TEPHRA.name().equals(txProvider)) {
+            if (!TransactionFactory.Provider.valueOf(txProvider).getTransactionProvider().isUnsupported(Feature.ALTER_NONTX_TO_TX)) {
                 throw e;
             }
             assertEquals(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL.getErrorCode(), e.getErrorCode());
@@ -247,7 +370,7 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
         }
         
         conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "2(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)");
-        conn.createStatement().execute("ALTER TABLE " + nonTxTableName + "2 SET TRANSACTIONAL=true, VERSIONS=10");
+        conn.createStatement().execute("ALTER TABLE " + nonTxTableName + "2 SET TRANSACTIONAL=true, VERSIONS=10, " + tableDDLOptions);
         desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes( nonTxTableName + "2"));
         for (ColumnFamilyDescriptor colDesc : desc.getColumnFamilies()) {
             assertEquals(10, colDesc.getMaxVersions());
@@ -388,16 +511,39 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
     public void testSetTTL() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         TransactionFactory.Provider txProvider = TransactionFactory.Provider.valueOf(this.txProvider);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props); Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
+        try (Connection conn = DriverManager.getConnection(getUrl(), props); 
+             Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
             String tableName = generateUniqueName();
-            conn.createStatement().execute("CREATE TABLE " + tableName + 
-                    "(K VARCHAR PRIMARY KEY) TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + txProvider + "',TTL=100");
-            assertTTL(admin, tableName, 100);
+            try {
+                conn.createStatement().execute("CREATE TABLE " + tableName + 
+                        "(K VARCHAR PRIMARY KEY) TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + txProvider + "',TTL=100");
+                if (txProvider.getTransactionProvider().isUnsupported(Feature.SET_TTL)) {
+                    fail();
+                }
+                assertTTL(admin, tableName, 100);
+            } catch (SQLException e) {
+                if (txProvider.getTransactionProvider().isUnsupported(Feature.SET_TTL)) {
+                    assertEquals(SQLExceptionCode.TTL_UNSUPPORTED_FOR_TXN_TABLE.getErrorCode(), e.getErrorCode());
+                } else {
+                    throw e;
+                }
+            }
             tableName = generateUniqueName();
             conn.createStatement().execute("CREATE TABLE " + tableName + 
                     "(K VARCHAR PRIMARY KEY) TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + txProvider + "'");
-            conn.createStatement().execute("ALTER TABLE " + tableName + " SET TTL=" + 200);
-            assertTTL(admin, tableName, 200);
+            try {
+                conn.createStatement().execute("ALTER TABLE " + tableName + " SET TTL=" + 200);
+                if (txProvider.getTransactionProvider().isUnsupported(Feature.SET_TTL)) {
+                    fail();
+                }
+                assertTTL(admin, tableName, 200);
+            } catch (SQLException e) {
+                if (txProvider.getTransactionProvider().isUnsupported(Feature.SET_TTL)) {
+                    assertEquals(SQLExceptionCode.TTL_UNSUPPORTED_FOR_TXN_TABLE.getErrorCode(), e.getErrorCode());
+                } else {
+                    throw e;
+                }
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
index 42e2102..f8946e5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
@@ -38,9 +38,9 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
-import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -52,18 +52,15 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
 	private final boolean localIndex;
 	private final String tableDDLOptions;
 
-	public TxCheckpointIT(boolean localIndex, boolean mutable, boolean columnEncoded) {
+	public TxCheckpointIT(boolean localIndex, boolean mutable, boolean columnEncoded, String transactionProvider) {
 	    StringBuilder optionBuilder = new StringBuilder();
+        optionBuilder.append("TRANSACTION_PROVIDER='" + transactionProvider + "'");
 	    this.localIndex = localIndex;
 	    if (!columnEncoded) {
-	        if (optionBuilder.length()!=0)
-	            optionBuilder.append(",");
-	        optionBuilder.append("COLUMN_ENCODED_BYTES=0");
+	        optionBuilder.append(",COLUMN_ENCODED_BYTES=0");
 	    }
 	    if (!mutable) {
-	        if (optionBuilder.length()!=0)
-	            optionBuilder.append(",");
-	        optionBuilder.append("IMMUTABLE_ROWS=true");
+	        optionBuilder.append(",IMMUTABLE_ROWS=true");
 	        if (!columnEncoded) {
 	            optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
 	        }
@@ -81,12 +78,13 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
         return conn;
     }
 	
-	@Parameters(name="TxCheckpointIT_localIndex={0},mutable={1},columnEncoded={2}") // name is used by failsafe as file name in reports
-    public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] {     
-                { false, false, false }, { false, false, true }, { false, true, false }, { false, true, true },
-                { true, false, false }, { true, false, true }, { true, true, false }, { true, true, true }
-           });
+	@Parameters(name="TxCheckpointIT_localIndex={0},mutable={1},columnEncoded={2},transactionProvider={3}") // name is used by failsafe as file name in reports
+    public static Collection<Object[]> data() {
+        return TestUtil.filterTxParamData(Arrays.asList(new Object[][] {     
+                { false, false, false, "TEPHRA" }, { false, false, true, "TEPHRA" }, { false, true, false, "TEPHRA" }, { false, true, true, "TEPHRA" },
+                { true, false, false, "TEPHRA" }, { true, false, true, "TEPHRA" }, { true, true, false, "TEPHRA" }, { true, true, true, "TEPHRA" },
+                { false, false, false, "OMID" }, { false, true, false, "OMID" }, 
+           }),3);
     }
     
     @Test
@@ -133,7 +131,6 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
     }
     
     private void testRollbackOfUncommittedDelete(String indexDDL, String fullTableName) throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = getConnection();
         conn.setAutoCommit(false);
         try {
@@ -179,7 +176,15 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
             assertEquals("a2", rs.getString(3));
             assertFalse(rs.next());
             
-            //assert row is delete in index table
+            // assert row is deleted in data table
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY k");
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+            
+            // assert row is deleted in index table
             rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY v1");
             assertTrue(rs.next());
             assertEquals("x2", rs.getString(1));
@@ -189,7 +194,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
             
             conn.rollback();
             
-            //assert two rows in data table
+            // assert two rows in data table
             rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY k");
             assertTrue(rs.next());
             assertEquals("x1", rs.getString(1));
@@ -201,7 +206,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
             assertEquals("a2", rs.getString(3));
             assertFalse(rs.next());
             
-            //assert two rows in index table
+            // assert two rows in index table
             rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY v1");
             assertTrue(rs.next());
             assertEquals("x1", rs.getString(1)); // fails here
@@ -266,7 +271,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
 		ResultSet rs;
 		MutationState state = conn.unwrap(PhoenixConnection.class)
 				.getMutationState();
-		state.startTransaction(TransactionFactory.Provider.TEPHRA);
+		conn.createStatement().executeQuery("select 1 from " + fullTableName + " LIMIT 1").next();
 		long wp = state.getWritePointer();
 		conn.createStatement().execute(
 				"upsert into " + fullTableName + " select max(id)+1, 'a4', 'b4' from " + fullTableName + "");
@@ -330,7 +335,8 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
 			conn.commit();
 
 	        MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
-	        state.startTransaction(TransactionFactory.Provider.TEPHRA);
+	        // Start a new transaction
+	        stmt.executeQuery("select 1 from " + fullTableName + "1 LIMIT 1").next();
 	        long wp = state.getWritePointer();
 	        conn.createStatement().execute("delete from " + fullTableName + "1 where id1=fk1b AND fk1b=id1");
 	        assertEquals(PhoenixVisibilityLevel.SNAPSHOT, state.getVisibilityLevel());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index f4e7bac..14ec45d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -87,6 +87,7 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
@@ -975,7 +976,9 @@ public class DeleteCompiler {
     private static boolean isMaintainedOnClient(PTable table) {
         // Test for not being local (rather than being GLOBAL) so that this doesn't fail
         // when tested with our projected table.
-        return table.getIndexType() != IndexType.LOCAL && (table.isImmutableRows() || table.isTransactional());
+        return (table.getIndexType() != IndexType.LOCAL && (table.isTransactional() || table.isImmutableRows())) ||
+               (table.getIndexType() == IndexType.LOCAL && (table.isTransactional() &&
+                table.getTransactionProvider().getTransactionProvider().isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER) ) );
     }
     
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
index f34c5a3..ba6d10c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
@@ -89,6 +89,9 @@ public class PostLocalIndexDDLCompiler {
                     scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
                 }
             }
+            if (dataTable.isTransactional()) {
+                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction());
+            }
 
             // Go through MutationPlan abstraction so that we can create local indexes
             // with a connectionless connection (which makes testing easier).


Mime
View raw message