phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdsi...@apache.org
Subject [3/3] phoenix git commit: Connect TAL to Phoenix
Date Mon, 26 Jun 2017 19:10:08 GMT
Connect TAL to Phoenix


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/211f1ebc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/211f1ebc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/211f1ebc

Branch: refs/heads/4.x-HBase-0.98
Commit: 211f1ebc737e48e98441c64e4a21b27c040293ad
Parents: 2423057
Author: Ohad Shacham <ohads@yahoo-inc.com>
Authored: Sun Jun 25 11:46:48 2017 +0300
Committer: Thomas <tdsilva@salesforce.com>
Committed: Mon Jun 26 12:08:44 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/execute/PartialCommitIT.java |   5 +-
 .../phoenix/tx/FlappingTransactionIT.java       |  41 +-
 .../phoenix/tx/ParameterizedTransactionIT.java  |   7 +-
 .../org/apache/phoenix/tx/TransactionIT.java    |  21 +-
 .../org/apache/phoenix/tx/TxCheckpointIT.java   |  63 +--
 .../phoenix/cache/IndexMetaDataCache.java       |   7 +-
 .../coprocessor/BaseScannerRegionObserver.java  |   1 +
 .../PhoenixTransactionalProcessor.java          |   4 +-
 .../UngroupedAggregateRegionObserver.java       |   6 +-
 .../apache/phoenix/execute/MutationState.java   | 379 +++++---------
 .../apache/phoenix/index/IndexMaintainer.java   |   4 +-
 .../index/IndexMetaDataCacheFactory.java        |  15 +-
 .../phoenix/index/PhoenixIndexMetaData.java     |  14 +-
 .../index/PhoenixTransactionalIndexer.java      |  37 +-
 .../NonAggregateRegionScannerFactory.java       |   5 +-
 .../phoenix/iterate/RegionScannerFactory.java   |   5 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |   4 +-
 .../phoenix/query/ConnectionQueryServices.java  |   2 -
 .../query/ConnectionQueryServicesImpl.java      |  62 +--
 .../query/ConnectionlessQueryServicesImpl.java  |  15 +-
 .../query/DelegateConnectionQueryServices.java  |   6 -
 .../apache/phoenix/schema/MetaDataClient.java   |   4 +-
 .../org/apache/phoenix/schema/PTableImpl.java   |   6 +-
 .../transaction/OmidTransactionContext.java     | 174 +++++++
 .../transaction/OmidTransactionTable.java       | 318 ++++++++++++
 .../transaction/PhoenixTransactionContext.java  | 191 +++++++
 .../transaction/PhoenixTransactionalTable.java  | 149 ++++++
 .../transaction/TephraTransactionContext.java   | 516 +++++++++++++++++++
 .../transaction/TephraTransactionTable.java     | 312 +++++++++++
 .../phoenix/transaction/TransactionFactory.java | 143 +++++
 .../java/org/apache/phoenix/util/IndexUtil.java |   4 +-
 .../org/apache/phoenix/util/PhoenixRuntime.java |   4 +-
 .../apache/phoenix/util/TransactionUtil.java    |  31 +-
 .../java/org/apache/phoenix/query/BaseTest.java |  68 +--
 34 files changed, 2099 insertions(+), 524 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index cd0c371..52a6627 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -270,12 +270,13 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         // passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState
         return new PhoenixConnection(phxCon, null) {
             @Override
+
             protected MutationState newMutationState(int maxSize, int maxSizeBytes) {
-                return new MutationState(maxSize, maxSizeBytes, this, mutations, null, null);
+                return new MutationState(maxSize, maxSizeBytes, this, mutations, false, null);
             };
         };
     }
-    
+
     public static class FailingRegionObserver extends SimpleRegionObserver {
         @Override
         public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
index fd198f4..084ec6d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
@@ -42,12 +42,11 @@ import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionalTable;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.TestUtil;
-import org.apache.tephra.TransactionContext;
-import org.apache.tephra.TransactionSystemClient;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.hbase.TransactionAwareHTable;
 import org.junit.Test;
 
 /**
@@ -213,8 +212,6 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
         String fullTableName = generateUniqueName();
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
         
-        TransactionSystemClient txServiceClient = pconn.getQueryServices().getTransactionSystemClient();
-
         Statement stmt = conn.createStatement();
         stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true");
         HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(fullTableName));
@@ -227,16 +224,18 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
             assertEquals(1,rs.getInt(1));
         }
 
-        // Use HBase level Tephra APIs to start a new transaction
-        TransactionAwareHTable txAware = new TransactionAwareHTable(htable, TxConstants.ConflictDetection.ROW);
-        TransactionContext txContext = new TransactionContext(txServiceClient, txAware);
-        txContext.start();
-        
+        PhoenixTransactionContext txContext =
+              TransactionFactory.getTransactionFactory().getTransactionContext(pconn);
+        PhoenixTransactionalTable txTable =
+              TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, htable);
+
+        txContext.begin();
+
         // Use HBase APIs to add a new row
         Put put = new Put(Bytes.toBytes("z"));
         put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
         put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("b"));
-        txAware.put(put);
+        txTable.put(put);
         
         // Use Phoenix APIs to add new row (sharing the transaction context)
         pconn.setTransactionContext(txContext);
@@ -258,8 +257,8 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
         assertTrue(rs.next());
         assertEquals(3,rs.getInt(1));
         
-        // Use Tephra APIs directly to finish (i.e. commit) the transaction
-        txContext.finish();
+        // Use TM APIs directly to finish (i.e. commit) the transaction
+        txContext.commit();
         
         // Confirm that attempt to commit row with conflict fails
         try {
@@ -279,14 +278,18 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
         }
         
         // Repeat the same as above, but this time abort the transaction
-        txContext = new TransactionContext(txServiceClient, txAware);
-        txContext.start();
+        txContext =
+              TransactionFactory.getTransactionFactory().getTransactionContext(pconn);
+        txTable =
+              TransactionFactory.getTransactionFactory().getTransactionalTable(txContext, htable);
+
+        txContext.begin();
         
         // Use HBase APIs to add a new row
         put = new Put(Bytes.toBytes("j"));
         put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
         put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("e"));
-        txAware.put(put);
+        txTable.put(put);
         
         // Use Phoenix APIs to add new row (sharing the transaction context)
         pconn.setTransactionContext(txContext);
@@ -302,7 +305,7 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
         assertTrue(rs.next());
         assertEquals(4,rs.getInt(1));
 
-        // Use Tephra APIs directly to abort (i.e. rollback) the transaction
+        // Use TM APIs directly to abort (i.e. rollback) the transaction
         txContext.abort();
         
         rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName);
@@ -325,4 +328,4 @@ public class FlappingTransactionIT extends ParallelStatsDisabledIT {
         assertTrue(result.isEmpty());
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
index a5c1cf4..fecfd9a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/ParameterizedTransactionIT.java
@@ -53,10 +53,10 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.TestUtil;
-import org.apache.tephra.TxConstants;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -391,7 +391,10 @@ public class ParameterizedTransactionIT extends ParallelStatsDisabledIT {
         admin.createTable(desc);
         ddl = "CREATE TABLE " + t2 + " (k varchar primary key) transactional=true";
         conn.createStatement().execute(ddl);
-        assertEquals(Boolean.TRUE.toString(), admin.getTableDescriptor(TableName.valueOf(t2)).getValue(TxConstants.READ_NON_TX_DATA));
+
+        HTableDescriptor htableDescriptor = admin.getTableDescriptor(TableName.valueOf(t2));
+        String str = htableDescriptor.getValue(PhoenixTransactionContext.READ_NON_TX_DATA);
+        assertEquals(Boolean.TRUE.toString(), str);
         
         // Should be ok, as HBase metadata should match existing metadata.
         ddl = "CREATE TABLE IF NOT EXISTS " + t1 + " (k varchar primary key)"; 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index f37d09b..c76e19c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -43,10 +43,10 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.apache.phoenix.util.TestUtil;
-import org.apache.tephra.TxConstants;
 import org.junit.Test;
 
 public class TransactionIT  extends ParallelStatsDisabledIT {
@@ -147,21 +147,24 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
         for (HColumnDescriptor colDesc : desc.getFamilies()) {
             assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions());
             assertEquals(1000, colDesc.getTimeToLive());
-            assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+            String propertyTTL = colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL);
+            assertEquals(1000, Integer.parseInt(propertyTTL));
         }
 
         desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("IDX1"));
         for (HColumnDescriptor colDesc : desc.getFamilies()) {
             assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions());
             assertEquals(1000, colDesc.getTimeToLive());
-            assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+            String propertyTTL = colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL);
+            assertEquals(1000, Integer.parseInt(propertyTTL));
         }
         
         desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("IDX2"));
         for (HColumnDescriptor colDesc : desc.getFamilies()) {
             assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions());
             assertEquals(1000, colDesc.getTimeToLive());
-            assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+            String propertyTTL = colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL);
+            assertEquals(1000, Integer.parseInt(propertyTTL));
         }
         
         conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "2(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)");
@@ -170,14 +173,15 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
         for (HColumnDescriptor colDesc : desc.getFamilies()) {
             assertEquals(10, colDesc.getMaxVersions());
             assertEquals(HColumnDescriptor.DEFAULT_TTL, colDesc.getTimeToLive());
-            assertEquals(null, colDesc.getValue(TxConstants.PROPERTY_TTL));
+            assertEquals(null, colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL));
         }
         conn.createStatement().execute("ALTER TABLE " + nonTxTableName + "2 SET TTL=1000");
         desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes( nonTxTableName + "2"));
         for (HColumnDescriptor colDesc : desc.getFamilies()) {
             assertEquals(10, colDesc.getMaxVersions());
             assertEquals(1000, colDesc.getTimeToLive());
-            assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+            String propertyTTL = colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL);
+            assertEquals(1000, Integer.parseInt(propertyTTL));
         }
 
         conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "3(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)");
@@ -207,7 +211,8 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
         for (HColumnDescriptor colDesc : desc.getFamilies()) {
             assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions());
             assertEquals(HColumnDescriptor.DEFAULT_TTL, colDesc.getTimeToLive());
-            assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+            String propertyTTL = colDesc.getValue(PhoenixTransactionContext.PROPERTY_TTL);
+            assertEquals(1000, Integer.parseInt(propertyTTL));
         }
     }
     
@@ -291,4 +296,4 @@ public class TransactionIT  extends ParallelStatsDisabledIT {
             conn.close();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
index cb3b4b3..989a97e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
@@ -37,9 +37,9 @@ import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
-import org.apache.tephra.Transaction.VisibilityLevel;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -53,21 +53,21 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
 
 	public TxCheckpointIT(boolean localIndex, boolean mutable, boolean columnEncoded) {
 	    StringBuilder optionBuilder = new StringBuilder();
-		this.localIndex = localIndex;
-		if (!columnEncoded) {
-            if (optionBuilder.length()!=0)
-                optionBuilder.append(",");
-            optionBuilder.append("COLUMN_ENCODED_BYTES=0");
-        }
-        if (!mutable) {
-            if (optionBuilder.length()!=0)
-                optionBuilder.append(",");
-            optionBuilder.append("IMMUTABLE_ROWS=true");
-            if (!columnEncoded) {
-                optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
-            }
-        }
-        this.tableDDLOptions = optionBuilder.toString();
+	    this.localIndex = localIndex;
+	    if (!columnEncoded) {
+	        if (optionBuilder.length()!=0)
+	            optionBuilder.append(",");
+	        optionBuilder.append("COLUMN_ENCODED_BYTES=0");
+	    }
+	    if (!mutable) {
+	        if (optionBuilder.length()!=0)
+	            optionBuilder.append(",");
+	        optionBuilder.append("IMMUTABLE_ROWS=true");
+	        if (!columnEncoded) {
+	            optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
+	        }
+	    }
+	    this.tableDDLOptions = optionBuilder.toString();
 	}
 	
     private static Connection getConnection() throws SQLException {
@@ -83,8 +83,8 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
 	@Parameters(name="TxCheckpointIT_localIndex={0},mutable={1},columnEncoded={2}") // name is used by failsafe as file name in reports
     public static Collection<Boolean[]> data() {
         return Arrays.asList(new Boolean[][] {     
-                 { false, false, false }, { false, false, true }, { false, true, false }, { false, true, true },
-                 { true, false, false }, { true, false, true }, { true, true, false }, { true, true, true }
+                { false, false, false }, { false, false, true }, { false, true, false }, { false, true, true },
+                { true, false, false }, { true, false, true }, { true, true, false }, { true, true, true }
            });
     }
     
@@ -101,7 +101,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
         Connection conn = getConnection(props);
         conn.setAutoCommit(true);
         conn.createStatement().execute("CREATE SEQUENCE "+seqName);
-        conn.createStatement().execute("CREATE TABLE " + fullTableName + "(pk INTEGER PRIMARY KEY, val INTEGER)" + tableDDLOptions);
+        conn.createStatement().execute("CREATE TABLE " + fullTableName + "(pk INTEGER PRIMARY KEY, val INTEGER)"+tableDDLOptions);
         conn.createStatement().execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + "(val)");
 
         conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES (NEXT VALUE FOR " + seqName + ",1)");
@@ -132,11 +132,12 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
     }
     
     private void testRollbackOfUncommittedDelete(String indexDDL, String fullTableName) throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = getConnection();
         conn.setAutoCommit(false);
         try {
             Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
+            stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+tableDDLOptions);
             stmt.execute(indexDDL);
             
             stmt.executeUpdate("upsert into " + fullTableName + " values('x1', 'y1', 'a1')");
@@ -220,11 +221,13 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
         String tableName = "TBL_" + generateUniqueName();
         String indexName = "IDX_" + generateUniqueName();
         String fullTableName = SchemaUtil.getTableName(tableName, tableName);
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
 		try (Connection conn = getConnection()) {
 			conn.setAutoCommit(false);
 			Statement stmt = conn.createStatement();
 
-			stmt.execute("CREATE TABLE " + fullTableName + "(ID BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
+			stmt.execute("CREATE TABLE " + fullTableName + "(ID BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"
+					+ tableDDLOptions);
 			stmt.execute("CREATE " + (localIndex ? "LOCAL " : "")
 					+ "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE(v2)");
 
@@ -266,7 +269,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
 		long wp = state.getWritePointer();
 		conn.createStatement().execute(
 				"upsert into " + fullTableName + " select max(id)+1, 'a4', 'b4' from " + fullTableName + "");
-		assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+		assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
 				state.getVisibilityLevel());
 		assertEquals(wp, state.getWritePointer()); // Make sure write ptr
 													// didn't move
@@ -278,7 +281,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
 
 		conn.createStatement().execute(
 				"upsert into " + fullTableName + " select max(id)+1, 'a5', 'b5' from " + fullTableName + "");
-		assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+		assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
 				state.getVisibilityLevel());
 		assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr
 														// moves
@@ -291,7 +294,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
 		
 		conn.createStatement().execute(
 				"upsert into " + fullTableName + " select max(id)+1, 'a6', 'b6' from " + fullTableName + "");
-		assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+		assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
 				state.getVisibilityLevel());
 		assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr
 														// moves
@@ -313,8 +316,10 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
 		try (Connection conn = getConnection()) {
 			conn.setAutoCommit(false);
 			Statement stmt = conn.createStatement();
-			stmt.execute("CREATE TABLE " + fullTableName + "1(ID1 BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)" + tableDDLOptions);
-			stmt.execute("CREATE TABLE " + fullTableName + "2(ID2 BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)" + tableDDLOptions);
+			stmt.execute("CREATE TABLE " + fullTableName + "1(ID1 BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)"
+					+ tableDDLOptions);
+			stmt.execute("CREATE TABLE " + fullTableName + "2(ID2 BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)"
+					+ tableDDLOptions);
 			stmt.execute("CREATE " + (localIndex ? "LOCAL " : "")
 					+ "INDEX " + indexName + " ON " + fullTableName + "1 (FK1B)");
 			
@@ -328,7 +333,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
 	        state.startTransaction();
 	        long wp = state.getWritePointer();
 	        conn.createStatement().execute("delete from " + fullTableName + "1 where id1=fk1b AND fk1b=id1");
-	        assertEquals(VisibilityLevel.SNAPSHOT, state.getVisibilityLevel());
+	        assertEquals(PhoenixVisibilityLevel.SNAPSHOT, state.getVisibilityLevel());
 	        assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move
 	
 	        rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");
@@ -346,7 +351,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
 	        assertFalse(rs.next());
 	
 	        conn.createStatement().execute("delete from " + fullTableName + "1 where id1 in (select fk1a from " + fullTableName + "1 join " + fullTableName + "2 on (fk2=id1))");
-	        assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
+	        assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
 	        assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved
 	
 	        rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");
@@ -363,7 +368,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT {
             stmt.executeUpdate("upsert into " + fullTableName + "2 values (2, 4)");
 
             conn.createStatement().execute("delete from " + fullTableName + "1 where id1 in (select fk1a from " + fullTableName + "1 join " + fullTableName + "2 on (fk2=id1))");
-            assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
+            assertEquals(PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
             assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved
     
             rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
index d22993c..16207c8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
@@ -23,9 +23,8 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.tephra.Transaction;
-
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 
 public interface IndexMetaDataCache extends Closeable {
     public static final IndexMetaDataCache EMPTY_INDEX_META_DATA_CACHE = new IndexMetaDataCache() {
@@ -40,11 +39,11 @@ public interface IndexMetaDataCache extends Closeable {
         }
 
         @Override
-        public Transaction getTransaction() {
+        public PhoenixTransactionContext getTransactionContext() {
             return null;
         }
         
     };
     public List<IndexMaintainer> getIndexMaintainers();
-    public Transaction getTransaction();
+    public PhoenixTransactionContext getTransactionContext();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index a11f7ee..30a4e5a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -321,5 +321,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
             useNewValueColumnQualifier, encodingScheme);
         return regionScannerFactory.getWrappedScanner(c.getEnvironment(), s, null, null, offset, scan, dataColumns, tupleProjector,
             dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
+
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
index 8693681..37fa2ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
@@ -17,12 +17,12 @@
  */
 package org.apache.phoenix.coprocessor;
 
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.phoenix.transaction.TransactionFactory;
 
 public class PhoenixTransactionalProcessor extends DelegateRegionObserver {
 
     public PhoenixTransactionalProcessor() {
-        super(new TransactionProcessor());
+        super(TransactionFactory.getTransactionFactory().getTransactionContext().getCoProcessor());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index f0649f3..3a2fbca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -108,6 +108,7 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.ExpressionUtil;
@@ -118,7 +119,6 @@ import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.apache.phoenix.util.TimeKeeper;
-import org.apache.tephra.TxConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -608,7 +608,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                             }
                             mutations.add(delete);
                             // force tephra to ignore this deletes
-                            delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+                            delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
                         } else if (isUpsert) {
                             Arrays.fill(values, null);
                             int bucketNumOffset = 0;
@@ -675,7 +675,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                     results.get(0).getRowLength());
                                 delete.deleteColumns(deleteCF,  deleteCQ, ts);
                                 // force tephra to ignore this deletes
-                                delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+                                delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
                                 mutations.add(delete);
                             }
                         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 87767cb..e18dc9f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -83,6 +83,10 @@ import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
+import org.apache.phoenix.transaction.PhoenixTransactionalTable;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
@@ -93,17 +97,6 @@ import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TransactionUtil;
-import org.apache.tephra.Transaction;
-import org.apache.tephra.Transaction.VisibilityLevel;
-import org.apache.tephra.TransactionAware;
-import org.apache.tephra.TransactionCodec;
-import org.apache.tephra.TransactionConflictException;
-import org.apache.tephra.TransactionContext;
-import org.apache.tephra.TransactionFailureException;
-import org.apache.tephra.TransactionSystemClient;
-import org.apache.tephra.hbase.TransactionAwareHTable;
-import org.apache.tephra.visibility.FenceWait;
-import org.apache.tephra.visibility.VisibilityFence;
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.TraceScope;
 import org.slf4j.Logger;
@@ -122,10 +115,9 @@ import com.google.common.collect.Sets;
  */
 public class MutationState implements SQLCloseable {
     private static final Logger logger = LoggerFactory.getLogger(MutationState.class);
-    private static final TransactionCodec CODEC = new TransactionCodec();
     private static final int[] EMPTY_STATEMENT_INDEX_ARRAY = new int[0];
     private static final int MAX_COMMIT_RETRIES = 3;
-    
+
     private final PhoenixConnection connection;
     private final long maxSize;
     private final long maxSizeBytes;
@@ -133,48 +125,48 @@ public class MutationState implements SQLCloseable {
     private final long batchSizeBytes;
     private long batchCount = 0L;
     private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations;
-    private final List<TransactionAware> txAwares;
-    private final TransactionContext txContext;
     private final Set<String> uncommittedPhysicalNames = Sets.newHashSetWithExpectedSize(10);
-    
-    private Transaction tx;
+
     private long sizeOffset;
     private int numRows = 0;
     private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     private boolean isExternalTxContext = false;
     private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap();
-    
+
+    final PhoenixTransactionContext phoenixTransactionContext;
+
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
 
+
     public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) {
-        this(maxSize, maxSizeBytes, connection, null, null);
+        this(maxSize, maxSizeBytes, connection, false, null);
     }
-    
-    public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, TransactionContext txContext) {
-        this(maxSize, maxSizeBytes, connection, null, txContext);
+
+    public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, PhoenixTransactionContext txContext) {
+        this(maxSize,maxSizeBytes, connection, false, txContext);
     }
-    
+
     public MutationState(MutationState mutationState) {
-        this(mutationState.maxSize, mutationState.maxSizeBytes, mutationState.connection, mutationState.getTransaction(), null);
+        this(mutationState.maxSize, mutationState.maxSizeBytes, mutationState.connection, true, mutationState.getPhoenixTransactionContext());
     }
-    
+
     public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, long sizeOffset) {
-        this(maxSize, maxSizeBytes, connection, null, null, sizeOffset);
+        this(maxSize, maxSizeBytes, connection, false, null, sizeOffset);
     }
-    
-    private MutationState(long maxSize, long maxSizeBytes,PhoenixConnection connection, Transaction tx, TransactionContext txContext) {
-        this(maxSize, maxSizeBytes, connection, tx, txContext, 0);
+
+    private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext) {
+        this(maxSize, maxSizeBytes, connection, subTask, txContext, 0);
     }
-    
-    private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, Transaction tx, TransactionContext txContext, long sizeOffset) {
-        this(maxSize, maxSizeBytes, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), tx, txContext);
+
+    private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext, long sizeOffset) {
+        this(maxSize, maxSizeBytes, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), subTask, txContext);
         this.sizeOffset = sizeOffset;
     }
-    
-    MutationState(long maxSize, long maxSizeBytes,
-            PhoenixConnection connection,
-            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations, Transaction tx, TransactionContext txContext) {
+
+    MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection,
+            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations,
+            boolean subTask, PhoenixTransactionContext txContext) {
         this.maxSize = maxSize;
         this.maxSizeBytes = maxSizeBytes;
         this.connection = connection;
@@ -184,115 +176,65 @@ public class MutationState implements SQLCloseable {
         boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
         this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
                 : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
-        this.tx = tx;
-        if (tx == null) {
-            this.txAwares = Collections.emptyList();
+        if (subTask == false) {
             if (txContext == null) {
-                TransactionSystemClient txServiceClient = this.connection
-                        .getQueryServices().getTransactionSystemClient();
-                this.txContext = new TransactionContext(txServiceClient);
+                phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(connection);
             } else {
                 isExternalTxContext = true;
-                this.txContext = txContext;
+                phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(txContext, connection, subTask);
             }
         } else {
             // this code path is only used while running child scans, we can't pass the txContext to child scans
             // as it is not thread safe, so we use the tx member variable
-            this.txAwares = Lists.newArrayList();
-            this.txContext = null;
+            phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(txContext, connection, subTask);
         }
     }
 
-    public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException {
-        this(maxSize, maxSizeBytes, connection, null, null, sizeOffset);
+    public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection)  throws SQLException {
+        this(maxSize, maxSizeBytes, connection, false, null, sizeOffset);
         this.mutations.put(table, mutations);
         this.numRows = mutations.size();
-        this.tx = connection.getMutationState().getTransaction();
         throwIfTooBig();
     }
-    
+
     public long getMaxSize() {
         return maxSize;
     }
-    
+
     public long getMaxSizeBytes() {
         return maxSizeBytes;
     }
-    
+
+    public PhoenixTransactionContext getPhoenixTransactionContext() {
+        return phoenixTransactionContext;
+    }
+
     /**
      * Commit a write fence when creating an index so that we can detect
      * when a data table transaction is started before the create index
      * but completes after it. In this case, we need to rerun the data
      * table transaction after the index creation so that the index rows
-     * are generated. See {@link #addDMLFence(PTable)} and TEPHRA-157
+     * are generated. See TEPHRA-157
      * for more information.
      * @param dataTable the data table upon which an index is being added
      * @throws SQLException
      */
     public void commitDDLFence(PTable dataTable) throws SQLException {
         if (dataTable.isTransactional()) {
-            byte[] key = dataTable.getName().getBytes();
-            boolean success = false;
             try {
-                FenceWait fenceWait = VisibilityFence.prepareWait(key, connection.getQueryServices().getTransactionSystemClient());
-                fenceWait.await(10000, TimeUnit.MILLISECONDS);
-                success = true;
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException();
-            } catch (TimeoutException | TransactionFailureException e) {
-                throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE)
-                .setSchemaName(dataTable.getSchemaName().getString())
-                .setTableName(dataTable.getTableName().getString())
-                .build().buildException();
+                phoenixTransactionContext.commitDDLFence(dataTable, logger);
             } finally {
                 // The client expects a transaction to be in progress on the txContext while the
                 // VisibilityFence.prepareWait() starts a new tx and finishes/aborts it. After it's
                 // finished, we start a new one here.
                 // TODO: seems like an autonomous tx capability in Tephra would be useful here.
-                try {
-                    txContext.start();
-                    if (logger.isInfoEnabled() && success) logger.info("Added write fence at ~" + getTransaction().getReadPointer());
-                } catch (TransactionFailureException e) {
-                    throw TransactionUtil.getTransactionFailureException(e);
-                }
-            }
-        }
-    }
-    
-    /**
-     * Add an entry to the change set representing the DML operation that is starting.
-     * These entries will not conflict with each other, but they will conflict with a
-     * DDL operation of creating an index. See {@link #addDMLFence(PTable)} and TEPHRA-157
-     * for more information.
-     * @param table the table which is doing DML
-     * @throws SQLException
-     */
-    private void addDMLFence(PTable table) throws SQLException {
-        if (table.getType() == PTableType.INDEX || !table.isTransactional()) {
-            return;
-        }
-        byte[] logicalKey = table.getName().getBytes();
-        TransactionAware logicalTxAware = VisibilityFence.create(logicalKey);
-        if (this.txContext == null) {
-            this.txAwares.add(logicalTxAware);
-        } else {
-            this.txContext.addTransactionAware(logicalTxAware);
-        }
-        byte[] physicalKey = table.getPhysicalName().getBytes();
-        if (Bytes.compareTo(physicalKey, logicalKey) != 0) {
-            TransactionAware physicalTxAware = VisibilityFence.create(physicalKey);
-            if (this.txContext == null) {
-                this.txAwares.add(physicalTxAware);
-            } else {
-                this.txContext.addTransactionAware(physicalTxAware);
+                phoenixTransactionContext.begin();
             }
         }
     }
     
     public boolean checkpointIfNeccessary(MutationPlan plan) throws SQLException {
-        Transaction currentTx = getTransaction();
-        if (getTransaction() == null || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) {
+        if (! phoenixTransactionContext.isTransactionRunning()  || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) {
             return false;
         }
         Set<TableRef> sources = plan.getSourceRefs();
@@ -332,122 +274,81 @@ public class MutationState implements SQLCloseable {
                     break;
                 }
             }
+
+            phoenixTransactionContext.checkpoint(hasUncommittedData);
+
             if (hasUncommittedData) {
-                try {
-                    if (txContext == null) {
-                        currentTx = tx = connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx);
-                    }  else {
-                        txContext.checkpoint();
-                        currentTx = tx = txContext.getCurrentTransaction();
-                    }
-                    // Since we've checkpointed, we can clear out uncommitted set, since a statement run afterwards
-                    // should see all this data.
-                    uncommittedPhysicalNames.clear();
-                } catch (TransactionFailureException e) {
-                    throw new SQLException(e);
-                } 
+                uncommittedPhysicalNames.clear();
             }
-            // Since we're querying our own table while mutating it, we must exclude
-            // see our current mutations, otherwise we can get erroneous results (for DELETE)
-            // or get into an infinite loop (for UPSERT SELECT).
-            currentTx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
             return true;
         }
         return false;
     }
-    
-    private void addTransactionParticipant(TransactionAware txAware) throws SQLException {
-        if (txContext == null) {
-            txAwares.add(txAware);
-            assert(tx != null);
-            txAware.startTx(tx);
-        } else {
-            txContext.addTransactionAware(txAware);
-        }
-    }
-    
+
     // Though MutationState is not thread safe in general, this method should be because it may
     // be called by TableResultIterator in a multi-threaded manner. Since we do not want to expose
     // the Transaction outside of MutationState, this seems reasonable, as the member variables
     // would not change as these threads are running.
     public HTableInterface getHTable(PTable table) throws SQLException {
         HTableInterface htable = this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes());
-        Transaction currentTx;
-        if (table.isTransactional() && (currentTx=getTransaction()) != null) {
-            TransactionAwareHTable txAware = TransactionUtil.getTransactionAwareHTable(htable, table.isImmutableRows());
+        if (table.isTransactional() && phoenixTransactionContext.isTransactionRunning()) {
+            PhoenixTransactionalTable phoenixTransactionTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, htable, table);
             // Using cloned mutationState as we may have started a new transaction already
             // if auto commit is true and we need to use the original one here.
-            txAware.startTx(currentTx);
-            htable = txAware;
+            htable = phoenixTransactionTable;
         }
         return htable;
     }
-    
+
     public PhoenixConnection getConnection() {
         return connection;
     }
-    
-    // Kept private as the Transaction may change when check pointed. Keeping it private ensures
-    // no one holds on to a stale copy.
-    private Transaction getTransaction() {
-        return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null;
-    }
-    
+
     public boolean isTransactionStarted() {
-        return getTransaction() != null;
+        return phoenixTransactionContext.isTransactionRunning();
     }
-    
+
     public long getInitialWritePointer() {
-        Transaction tx = getTransaction();
-        return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getTransactionId(); // First write pointer - won't change with checkpointing
+        return phoenixTransactionContext.getTransactionId(); // First write pointer - won't change with checkpointing
     }
-    
+
     // For testing
     public long getWritePointer() {
-        Transaction tx = getTransaction();
-        return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getWritePointer();
+        return phoenixTransactionContext.getWritePointer();
     }
-    
+
     // For testing
-    public VisibilityLevel getVisibilityLevel() {
-        Transaction tx = getTransaction();
-        return tx == null ? null : tx.getVisibilityLevel();
+    public PhoenixVisibilityLevel getVisibilityLevel() {
+        return phoenixTransactionContext.getVisibilityLevel();
     }
-    
+
     public boolean startTransaction() throws SQLException {
-        if (txContext == null) {
-            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException();
-        }
-        
         if (connection.getSCN() != null) {
             throw new SQLExceptionInfo.Builder(
                     SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET)
                     .build().buildException();
         }
-        
-        try {
-            if (!isTransactionStarted()) {
-                // Clear any transactional state in case transaction was ended outside
-                // of Phoenix so we don't carry the old transaction state forward. We
-                // cannot call reset() here due to the case of having mutations and
-                // then transitioning from non transactional to transactional (which
-                // would end up clearing our uncommitted state).
-                resetTransactionalState();
-                txContext.start();
-                return true;
-            }
-        } catch (TransactionFailureException e) {
-            throw new SQLExceptionInfo.Builder(SQLExceptionCode.TRANSACTION_FAILED).setRootCause(e).build().buildException();
+
+        if (!isTransactionStarted()) {
+            // Clear any transactional state in case transaction was ended outside
+            // of Phoenix so we don't carry the old transaction state forward. We
+            // cannot call reset() here due to the case of having mutations and
+            // then transitioning from non transactional to transactional (which
+            // would end up clearing our uncommitted state).
+            resetTransactionalState();
+            phoenixTransactionContext.begin();
+            return true;
         }
+
         return false;
     }
 
     public static MutationState emptyMutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) {
-        MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null, null);
+        MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), false, null);
         state.sizeOffset = 0;
         return state;
     }
-    
+
     private void throwIfTooBig() throws SQLException {
         if (numRows > maxSize) {
             resetState();
@@ -529,13 +430,9 @@ public class MutationState implements SQLCloseable {
         if (this == newMutationState) { // Doesn't make sense
             return;
         }
-        if (txContext != null) {
-            for (TransactionAware txAware : newMutationState.txAwares) {
-                txContext.addTransactionAware(txAware);
-            }
-        } else {
-            txAwares.addAll(newMutationState.txAwares);
-        }
+
+        phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext());
+
         this.sizeOffset += newMutationState.sizeOffset;
         joinMutationState(newMutationState.mutations, this.mutations);
         if (!newMutationState.txMutations.isEmpty()) {
@@ -552,7 +449,7 @@ public class MutationState implements SQLCloseable {
         }
         throwIfTooBig();
     }
-    
+
 
     private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable table) {
         RowKeySchema schema = table.getRowKeySchema();
@@ -1084,26 +981,16 @@ public class MutationState implements SQLCloseable {
                         if (table.isTransactional()) {
                             // Track tables to which we've sent uncommitted data
                             txTableRefs.add(origTableRef);
-                            addDMLFence(table);
                             uncommittedPhysicalNames.add(table.getPhysicalName().getString());
-                            
+
                             // If we have indexes, wrap the HTable in a delegate HTable that
                             // will attach the necessary index meta data in the event of a
                             // rollback
                             if (!table.getIndexes().isEmpty()) {
                                 hTable = new MetaDataAwareHTable(hTable, origTableRef);
                             }
-                            TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable, table.isImmutableRows());
-                            // Don't add immutable indexes (those are the only ones that would participate
-                            // during a commit), as we don't need conflict detection for these.
-                            if (tableInfo.isDataTable()) {
-                                // Even for immutable, we need to do this so that an abort has the state
-                                // necessary to generate the rows to delete.
-                                addTransactionParticipant(txnAware);
-                            } else {
-                                txnAware.startTx(getTransaction());
-                            }
-                            hTable = txnAware;
+
+                            hTable = TransactionUtil.getPhoenixTransactionTable(phoenixTransactionContext, hTable, table);
                         }
                         
                         long numMutations = mutationList.size();
@@ -1209,15 +1096,11 @@ public class MutationState implements SQLCloseable {
     }
 
     public byte[] encodeTransaction() throws SQLException {
-        try {
-            return CODEC.encode(getTransaction());
-        } catch (IOException e) {
-            throw new SQLException(e);
-        }
+        return phoenixTransactionContext.encodeTransaction();
     }
-    
-    public static Transaction decodeTransaction(byte[] txnBytes) throws IOException {
-        return (txnBytes == null || txnBytes.length==0) ? null : CODEC.decode(txnBytes);
+
+    public static PhoenixTransactionContext decodeTransaction(byte[] txnBytes) throws IOException {
+        return TransactionFactory.getTransactionFactory().getTransactionContext(txnBytes);
     }
 
     private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation> mutations,
@@ -1294,29 +1177,22 @@ public class MutationState implements SQLCloseable {
         this.mutations.clear();
         resetTransactionalState();
     }
-    
+
     private void resetTransactionalState() {
-        tx = null;
-        txAwares.clear();
+        phoenixTransactionContext.reset();
         txMutations = Collections.emptyMap();
         uncommittedPhysicalNames.clear();
         uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     }
-    
+
     public void rollback() throws SQLException {
         try {
-            if (txContext != null && isTransactionStarted()) {
-                try {
-                    txContext.abort();
-                } catch (TransactionFailureException e) {
-                    throw TransactionUtil.getTransactionFailureException(e);
-                }
-            }
+            phoenixTransactionContext.abort();
         } finally {
             resetState();
         }
     }
-    
+
     public void commit() throws SQLException {
         Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap();
         int retryCount = 0;
@@ -1332,38 +1208,32 @@ public class MutationState implements SQLCloseable {
                 sqlE = e;
             } finally {
                 try {
-                    if (txContext != null && isTransactionStarted()) {
-                        TransactionFailureException txFailure = null;
-                        boolean finishSuccessful=false;
-                        try {
-                            if (sendSuccessful) {
-                                txContext.finish();
-                                finishSuccessful = true;
-                            }
-                        } catch (TransactionFailureException e) {
-                            if (logger.isInfoEnabled()) logger.info(e.getClass().getName() + " at timestamp " + getInitialWritePointer() + " with retry count of " + retryCount);
-                            retryCommit = (e instanceof TransactionConflictException && retryCount < MAX_COMMIT_RETRIES);
-                            txFailure = e;
-                            SQLException nextE = TransactionUtil.getTransactionFailureException(e);
-                            if (sqlE == null) {
-                                sqlE = nextE;
-                            } else {
-                                sqlE.setNextException(nextE);
-                            }
-                        } finally {
-                            // If send fails or finish fails, abort the tx
-                            if (!finishSuccessful) {
-                                try {
-                                    txContext.abort(txFailure);
-                                    if (logger.isInfoEnabled()) logger.info("Abort successful");
-                                } catch (TransactionFailureException e) {
-                                    if (logger.isInfoEnabled()) logger.info("Abort failed with " + e);
-                                    SQLException nextE = TransactionUtil.getTransactionFailureException(e);
-                                    if (sqlE == null) {
-                                        sqlE = nextE;
-                                    } else {
-                                        sqlE.setNextException(nextE);
-                                    }
+                    boolean finishSuccessful=false;
+                    try {
+                        if (sendSuccessful) {
+                            phoenixTransactionContext.commit();
+                            finishSuccessful = true;
+                        }
+                    } catch (SQLException e) {
+                        if (logger.isInfoEnabled()) logger.info(e.getClass().getName() + " at timestamp " + getInitialWritePointer() + " with retry count of " + retryCount);
+                        retryCommit = (e.getErrorCode() == SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode() && retryCount < MAX_COMMIT_RETRIES);
+                        if (sqlE == null) {
+                            sqlE = e;
+                        } else {
+                            sqlE.setNextException(e);
+                        }
+                    } finally {
+                        // If send fails or finish fails, abort the tx
+                        if (!finishSuccessful) {
+                            try {
+                                phoenixTransactionContext.abort();
+                                if (logger.isInfoEnabled()) logger.info("Abort successful");
+                            } catch (SQLException e) {
+                                if (logger.isInfoEnabled()) logger.info("Abort failed with " + e);
+                                if (sqlE == null) {
+                                    sqlE = e;
+                                } else {
+                                    sqlE.setNextException(e);
                                 }
                             }
                         }
@@ -1376,10 +1246,6 @@ public class MutationState implements SQLCloseable {
                             startTransaction();
                             // Add back read fences
                             Set<TableRef> txTableRefs = txMutations.keySet();
-                            for (TableRef tableRef : txTableRefs) {
-                                PTable dataTable = tableRef.getTable();
-                                addDMLFence(dataTable);
-                            }
                             try {
                                 // Only retry if an index was added
                                 retryCommit = shouldResubmitTransaction(txTableRefs);
@@ -1465,12 +1331,13 @@ public class MutationState implements SQLCloseable {
      * @throws SQLException
      */
     public boolean sendUncommitted(Iterator<TableRef> tableRefs) throws SQLException {
-        Transaction currentTx = getTransaction();
-        if (currentTx != null) {
+
+        if (phoenixTransactionContext.isTransactionRunning()) {
             // Initialize visibility so that transactions see their own writes.
             // The checkpoint() method will set it to not see writes if necessary.
-            currentTx.setVisibility(VisibilityLevel.SNAPSHOT);
+            phoenixTransactionContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT);
         }
+
         Iterator<TableRef> filteredTableRefs = Iterators.filter(tableRefs, new Predicate<TableRef>(){
             @Override
             public boolean apply(TableRef tableRef) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index be22334..a7ea99a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -100,6 +100,7 @@ import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.tuple.BaseTuple;
 import org.apache.phoenix.schema.tuple.ValueGetterTuple;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.BitSet;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -108,7 +109,6 @@ import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
-import org.apache.tephra.TxConstants;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -1046,7 +1046,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             }
         	else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()
         			// Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor
-        			|| (CellUtil.matchingQualifier(kv, TxConstants.FAMILY_DELETE_QUALIFIER) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) {
+        			|| (CellUtil.matchingQualifier(kv, TransactionFactory.getTransactionFactory().getTransactionContext().getFamilyDeleteMarker()) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) {
         	    nDeleteCF++;
         	}
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
index 9edcafc..18b9edd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java
@@ -24,15 +24,13 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
 
-import org.apache.tephra.Transaction;
-
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.cache.IndexMetaDataCache;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
-import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
-import org.apache.phoenix.util.TransactionUtil;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.TransactionFactory;
 
 public class IndexMetaDataCacheFactory implements ServerCacheFactory {
     public IndexMetaDataCacheFactory() {
@@ -49,11 +47,12 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory {
     @Override
     public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException {
         // just use the standard keyvalue builder - this doesn't really need to be fast
+        
         final List<IndexMaintainer> maintainers = 
                 IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE, useProtoForIndexMaintainer);
-        final Transaction txn;
+        final PhoenixTransactionContext txnContext;
         try {
-            txn = txState.length!=0 ? MutationState.decodeTransaction(txState) : null;
+            txnContext = txState.length != 0 ? TransactionFactory.getTransactionFactory().getTransactionContext(txState) : null;
         } catch (IOException e) {
             throw new SQLException(e);
         }
@@ -70,8 +69,8 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory {
             }
 
             @Override
-            public Transaction getTransaction() {
-                return txn;
+            public PhoenixTransactionContext getTransactionContext() {
+                return txnContext;
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 39473dc..fa2fed2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -30,12 +30,12 @@ import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ServerUtil;
-import org.apache.tephra.Transaction;
 
 public class PhoenixIndexMetaData implements IndexMetaData {
     private final Map<String, byte[]> attributes;
@@ -56,7 +56,7 @@ public class PhoenixIndexMetaData implements IndexMetaData {
         byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
         if (md != null) {
             final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md, useProto);
-            final Transaction txn = MutationState.decodeTransaction(txState);
+            final PhoenixTransactionContext txnContext = TransactionFactory.getTransactionFactory().getTransactionContext(txState);
             return new IndexMetaDataCache() {
 
                 @Override
@@ -68,8 +68,8 @@ public class PhoenixIndexMetaData implements IndexMetaData {
                 }
 
                 @Override
-                public Transaction getTransaction() {
-                    return txn;
+                public PhoenixTransactionContext getTransactionContext() {
+                    return txnContext;
                 }
 
             };
@@ -101,8 +101,8 @@ public class PhoenixIndexMetaData implements IndexMetaData {
         this.ignoreNewerMutations = attributes.get(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null;
     }
     
-    public Transaction getTransaction() {
-        return indexMetaDataCache.getTransaction();
+    public PhoenixTransactionContext getTransactionContext() {
+        return indexMetaDataCache.getTransactionContext();
     }
     
     public List<IndexMaintainer> getIndexMaintainers() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 9113bfc..4758848 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -72,14 +72,14 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
+import org.apache.phoenix.transaction.PhoenixTransactionalTable;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TransactionUtil;
-import org.apache.tephra.Transaction;
-import org.apache.tephra.Transaction.VisibilityLevel;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.hbase.TransactionAwareHTable;
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.Trace;
 import org.cloudera.htrace.TraceScope;
@@ -160,7 +160,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         Map<String,byte[]> updateAttributes = m.getAttributesMap();
         String tableName = c.getEnvironment().getRegion().getTableDesc().getNameAsString();
         PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
-        byte[] txRollbackAttribute = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY);
+        byte[] txRollbackAttribute = m.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY);
         Collection<Pair<Mutation, byte[]>> indexUpdates = null;
         // get the current span, or just use a null-span to avoid a bunch of if statements
         try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
@@ -249,14 +249,14 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
     }
     
     private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) throws IOException {
-        Transaction tx = indexMetaData.getTransaction();
-        if (tx == null) {
-            throw new NullPointerException("Expected to find transaction in metadata for " + env.getRegion().getRegionInfo().getTable().getNameAsString());
+        PhoenixTransactionContext txnContext = indexMetaData.getTransactionContext();
+        if (txnContext == null) {
+            throw new NullPointerException("Expected to find transaction in metadata for " + env.getRegionInfo().getTable().getNameAsString());
         }
         boolean isRollback = txRollbackAttribute!=null;
         boolean isImmutable = indexMetaData.isImmutableRows();
         ResultScanner currentScanner = null;
-        TransactionAwareHTable txTable = null;
+        PhoenixTransactionalTable txTable = null;
         // Collect up all mutations in batch
         Map<ImmutableBytesPtr, MultiMutation> mutations =
                 new HashMap<ImmutableBytesPtr, MultiMutation>();
@@ -321,23 +321,22 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
                 scanRanges.initializeScan(scan);
                 TableName tableName = env.getRegion().getRegionInfo().getTable();
                 HTableInterface htable = env.getTable(tableName);
-                txTable = new TransactionAwareHTable(htable);
-                txTable.startTx(tx);
+                txTable = TransactionFactory.getTransactionFactory().getTransactionalTable(txnContext, htable);
                 // For rollback, we need to see all versions, including
                 // the last committed version as there may be multiple
                 // checkpointed versions.
                 SkipScanFilter filter = scanRanges.getSkipScanFilter();
                 if (isRollback) {
                     filter = new SkipScanFilter(filter,true);
-                    tx.setVisibility(VisibilityLevel.SNAPSHOT_ALL);
+                    txnContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT_ALL);
                 }
                 scan.setFilter(filter);
                 currentScanner = txTable.getScanner(scan);
             }
             if (isRollback) {
-                processRollback(env, indexMetaData, txRollbackAttribute, currentScanner, tx, mutableColumns, indexUpdates, mutations);
+                processRollback(env, indexMetaData, txRollbackAttribute, currentScanner, txnContext, mutableColumns, indexUpdates, mutations);
             } else {
-                processMutation(env, indexMetaData, txRollbackAttribute, currentScanner, tx, mutableColumns, indexUpdates, mutations, findPriorValueMutations);
+                processMutation(env, indexMetaData, txRollbackAttribute, currentScanner, txnContext, mutableColumns, indexUpdates, mutations, findPriorValueMutations);
             }
         } finally {
             if (txTable != null) txTable.close();
@@ -360,7 +359,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
     private void processMutation(RegionCoprocessorEnvironment env,
             PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
             ResultScanner scanner,
-            Transaction tx, 
+            PhoenixTransactionContext txnContext, 
             Set<ColumnReference> upsertColumns, 
             Collection<Pair<Mutation, byte[]>> indexUpdates,
             Map<ImmutableBytesPtr, MultiMutation> mutations,
@@ -372,14 +371,14 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             // Process existing data table rows by removing the old index row and adding the new index row
             while ((result = scanner.next()) != null) {
                 Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow()));
-                TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result);
+                TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), txnContext.getWritePointer(), m, emptyColRef, result);
                 generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
                 generatePuts(indexMetaData, indexUpdates, state);
             }
         }
         // Process new data table by adding new index rows
         for (Mutation m : mutations.values()) {
-            TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), tx.getWritePointer(), m);
+            TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), txnContext.getWritePointer(), m);
             generatePuts(indexMetaData, indexUpdates, state);
         }
     }
@@ -387,7 +386,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
     private void processRollback(RegionCoprocessorEnvironment env,
             PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
             ResultScanner scanner,
-            Transaction tx, Set<ColumnReference> mutableColumns,
+            PhoenixTransactionContext tx, Set<ColumnReference> mutableColumns,
             Collection<Pair<Mutation, byte[]>> indexUpdates,
             Map<ImmutableBytesPtr, MultiMutation> mutations) throws IOException {
         if (scanner != null) {
@@ -474,7 +473,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData);
         for (IndexUpdate delete : deletes) {
             if (delete.isValid()) {
-                delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue);
+                delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue);
                 indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index b0c3200..e33ce0c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.iterate;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Result;
@@ -54,6 +55,8 @@ import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
@@ -111,7 +114,7 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory {
     HRegion dataRegion = null;
     IndexMaintainer indexMaintainer = null;
     byte[][] viewConstants = null;
-    Transaction tx = null;
+    PhoenixTransactionContext tx = null;
     ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
     if (dataColumns != null) {
       tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 15f721f..a097d83 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -19,6 +19,7 @@
 package org.apache.phoenix.iterate;
 
 import com.google.common.collect.ImmutableList;
+
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -37,10 +38,10 @@ import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.*;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
-import org.apache.tephra.Transaction;
 
 import java.io.IOException;
 import java.util.List;
@@ -94,7 +95,7 @@ public abstract class RegionScannerFactory {
       final Expression[] arrayFuncRefs, final int offset, final Scan scan,
       final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
       final HRegion dataRegion, final IndexMaintainer indexMaintainer,
-      Transaction tx,
+      PhoenixTransactionContext tx,
       final byte[][] viewConstants, final KeyValueSchema kvSchema,
       final ValueBitSet kvSchemaBitSet, final TupleProjector projector,
       final ImmutableBytesWritable ptr, final boolean useQualifierAsListIndex) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 09796aa..33c54e0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -101,6 +101,7 @@ import org.apache.phoenix.schema.types.PUnsignedDate;
 import org.apache.phoenix.schema.types.PUnsignedTime;
 import org.apache.phoenix.schema.types.PUnsignedTimestamp;
 import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.JDBCUtil;
 import org.apache.phoenix.util.NumberUtil;
@@ -110,7 +111,6 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.SchemaUtil;
-import org.apache.tephra.TransactionContext;
 import org.cloudera.htrace.Sampler;
 import org.cloudera.htrace.TraceScope;
 
@@ -652,7 +652,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
         mutationState.sendUncommitted();
     }
         
-    public void setTransactionContext(TransactionContext txContext) throws SQLException {
+    public void setTransactionContext(PhoenixTransactionContext txContext) throws SQLException {
         if (!this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT)
             .build().buildException();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/211f1ebc/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 38580e4..45ab5fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -46,7 +46,6 @@ import org.apache.phoenix.schema.SequenceAllocation;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
-import org.apache.tephra.TransactionSystemClient;
 
 
 public interface ConnectionQueryServices extends QueryServices, MetaDataMutated {
@@ -132,7 +131,6 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
     public long clearCache() throws SQLException;
     public int getSequenceSaltBuckets();
 
-    TransactionSystemClient getTransactionSystemClient();
     public long getRenewLeaseThresholdMilliSeconds();
     public boolean isRenewingLeasesEnabled();
 


Mime
View raw message