phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [2/4] phoenix git commit: PHOENIX-4605 Support running multiple transaction providers
Date Fri, 13 Apr 2018 01:02:05 GMT
http://git-wip-us.apache.org/repos/asf/phoenix/blob/06e9c1f0/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index a7b31e8..1a11427 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -69,6 +69,7 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -134,7 +135,7 @@ public class PTableImpl implements PTable {
     private boolean disableWAL;
     private boolean multiTenant;
     private boolean storeNulls;
-    private boolean isTransactional;
+    private TransactionFactory.Provider transactionProvider;
     private ViewType viewType;
     private Short viewIndexId;
     private int estimatedSize;
@@ -227,7 +228,7 @@ public class PTableImpl implements PTable {
         init(tenantId, this.schemaName, this.tableName, PTableType.INDEX, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
             this.schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
             null, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
-            isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMpped, null, false, storageScheme, qualifierEncodingScheme, encodedCQCounter, useStatsForParallelization);
+            transactionProvider, updateCacheFrequency, indexDisableTimestamp, isNamespaceMpped, null, false, storageScheme, qualifierEncodingScheme, encodedCQCounter, useStatsForParallelization);
     }
 
     public PTableImpl(long timeStamp) { // For delete marker
@@ -270,7 +271,7 @@ public class PTableImpl implements PTable {
                     table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), table.getParentSchemaName(), table.getParentTableName(),
                     indexes, table.isImmutableRows(), physicalNames, table.getDefaultFamilyName(), viewStatement,
                     table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                    table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), updateCacheFrequency,
+                    table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), updateCacheFrequency,
                     table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
         }
 
@@ -280,7 +281,7 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), parentSchemaName, table.getParentTableName(),
                 indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), viewStatement,
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(),
                 table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
     }
 
@@ -290,7 +291,7 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(),
                 table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
     }
     
@@ -300,7 +301,7 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(),
                 table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
     }
 
@@ -310,7 +311,7 @@ public class PTableImpl implements PTable {
                 table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), defaultFamily, table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(),
                 table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
     }
 
@@ -320,7 +321,7 @@ public class PTableImpl implements PTable {
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(),
                 table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), 
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), 
                 table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
     }
 
@@ -330,18 +331,18 @@ public class PTableImpl implements PTable {
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
-                table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(),
+                table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(),
                 table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
     }
     
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled,
-            boolean isMultitenant, boolean storeNulls, boolean isTransactional, long updateCacheFrequency, boolean isNamespaceMapped) throws SQLException {
+            boolean isMultitenant, boolean storeNulls, TransactionFactory.Provider transactionProvider, long updateCacheFrequency, boolean isNamespaceMapped) throws SQLException {
         return new PTableImpl(
                 table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp,
                 sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
                 table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional, updateCacheFrequency, table.getIndexDisableTimestamp(), 
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), transactionProvider, updateCacheFrequency, table.getIndexDisableTimestamp(), 
                 isNamespaceMapped, table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
     }
     
@@ -352,7 +353,7 @@ public class PTableImpl implements PTable {
                 table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(),
                 table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
     }
 
@@ -363,7 +364,7 @@ public class PTableImpl implements PTable {
                 table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
+                table.getBaseColumnCount(), rowKeyOrderOptimizable, table.getTransactionProvider(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), 
                 table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
     }
 
@@ -374,7 +375,7 @@ public class PTableImpl implements PTable {
                 table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(),
                 table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(),
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
-                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), 
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), 
                 table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization());
     }
 
@@ -383,12 +384,12 @@ public class PTableImpl implements PTable {
             Collection<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes,
             boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
             boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
-            IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
+            IndexType indexType, boolean rowKeyOrderOptimizable, TransactionFactory.Provider transactionProvider, long updateCacheFrequency,
             long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter, Boolean useStatsForParallelization) throws SQLException {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName,
                 dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
-                indexType, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable, isTransactional,
+                indexType, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable, transactionProvider,
                 updateCacheFrequency,indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme, qualifierEncodingScheme, encodedCQCounter, useStatsForParallelization);
     }
 
@@ -397,7 +398,7 @@ public class PTableImpl implements PTable {
             Collection<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes,
             boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression,
             boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
-            IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
+            IndexType indexType, boolean rowKeyOrderOptimizable, TransactionFactory.Provider transactionProvider, long updateCacheFrequency,
             int baseColumnCount, long indexDisableTimestamp, boolean isNamespaceMapped,
             String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme,
             QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter, Boolean useStatsForParallelization)
@@ -405,7 +406,7 @@ public class PTableImpl implements PTable {
         return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName,
                 bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames,
                 defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId,
-                indexType, baseColumnCount, rowKeyOrderOptimizable, isTransactional, updateCacheFrequency, 
+                indexType, baseColumnCount, rowKeyOrderOptimizable, transactionProvider, updateCacheFrequency, 
                 indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme, qualifierEncodingScheme, encodedCQCounter, useStatsForParallelization);
     }
 
@@ -414,13 +415,13 @@ public class PTableImpl implements PTable {
             PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows,
             List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant,
             boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType,
-            int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency,
+            int baseColumnCount, boolean rowKeyOrderOptimizable, TransactionFactory.Provider transactionProvider, long updateCacheFrequency,
             long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, 
             QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter, Boolean useStatsForParallelization) throws SQLException {
         init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns,
                 parentSchemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName,
                 viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
-                isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme, 
+                transactionProvider, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme, 
                 qualifierEncodingScheme, encodedCQCounter, useStatsForParallelization);
     }
     
@@ -454,7 +455,7 @@ public class PTableImpl implements PTable {
             PName pkName, Integer bucketNum, Collection<PColumn> columns, PName parentSchemaName, PName parentTableName,
             List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL,
             boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId,
-            IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp, 
+            IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, TransactionFactory.Provider transactionProvider, long updateCacheFrequency, long indexDisableTimestamp, 
             boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, 
             EncodedCQCounter encodedCQCounter, Boolean useStatsForParallelization) throws SQLException {
         Preconditions.checkNotNull(schemaName);
@@ -486,7 +487,7 @@ public class PTableImpl implements PTable {
         this.viewType = viewType;
         this.viewIndexId = viewIndexId;
         this.indexType = indexType;
-        this.isTransactional = isTransactional;
+        this.transactionProvider = transactionProvider;
         this.rowKeyOrderOptimizable = rowKeyOrderOptimizable;
         this.updateCacheFrequency = updateCacheFrequency;
         this.isNamespaceMapped = isNamespaceMapped;
@@ -1279,7 +1280,13 @@ public class PTableImpl implements PTable {
         boolean disableWAL = table.getDisableWAL();
         boolean multiTenant = table.getMultiTenant();
         boolean storeNulls = table.getStoreNulls();
-        boolean isTransactional = table.getTransactional();
+        TransactionFactory.Provider transactionProvider = null;
+        if (table.hasTransactionProvider()) {
+            transactionProvider = TransactionFactory.Provider.fromCode(table.getTransactionProvider());
+        } else if (table.hasTransactional()) {
+            // For backward compatibility prior to transactionProvider field
+            transactionProvider = TransactionFactory.Provider.TEPHRA;
+        }
         ViewType viewType = null;
         String viewStatement = null;
         List<PName> physicalNames = Collections.emptyList();
@@ -1352,7 +1359,7 @@ public class PTableImpl implements PTable {
                 (bucketNum == NO_SALTING) ? null : bucketNum, columns, parentSchemaName, parentTableName, indexes,
                         isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL,
                         multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable,
-                        isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoParititonSeqName, 
+                        transactionProvider, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoParititonSeqName, 
                         isAppendOnlySchema, storageScheme, qualifierEncodingScheme, encodedColumnQualifierCounter, useStatsForParallelization);
             return result;
         } catch (SQLException e) {
@@ -1418,7 +1425,9 @@ public class PTableImpl implements PTable {
       builder.setDisableWAL(table.isWALDisabled());
       builder.setMultiTenant(table.isMultiTenant());
       builder.setStoreNulls(table.getStoreNulls());
-      builder.setTransactional(table.isTransactional());
+      if (table.getTransactionProvider() != null) {
+          builder.setTransactionProvider(table.getTransactionProvider().getCode());
+      }
       if(table.getType() == PTableType.VIEW){
         builder.setViewType(ByteStringer.wrap(new byte[]{table.getViewType().getSerializedValue()}));
       }
@@ -1473,8 +1482,13 @@ public class PTableImpl implements PTable {
     }
 
     @Override
-    public boolean isTransactional() {
-        return isTransactional;
+    public TransactionFactory.Provider getTransactionProvider() {
+        return transactionProvider;
+    }
+    
+    @Override
+    public final boolean isTransactional() {
+        return transactionProvider != null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/06e9c1f0/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index c500b2e..78b9beb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.SchemaUtil;
 
 public enum TableProperty {
@@ -94,6 +95,23 @@ public enum TableProperty {
         }
     },
     
+    TRANSACTION_PROVIDER(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false, false) {
+        @Override
+        public Object getPTableValue(PTable table) {
+            return table.getTransactionProvider();
+        }
+        @Override
+        public Object getValue(Object value) {
+            try {
+                return value == null ? null : TransactionFactory.Provider.valueOf(value.toString());
+            } catch (IllegalArgumentException e) {
+                throw new RuntimeException(new SQLExceptionInfo.Builder(SQLExceptionCode.UNKNOWN_TRANSACTION_PROVIDER)
+                .setMessage(value.toString())
+                .build().buildException());
+            }
+        }
+    },
+
     UPDATE_CACHE_FREQUENCY(PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY, true, true, true) {
 	    @Override
         public Object getValue(Object value) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/06e9c1f0/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index 110868e..543eda1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -17,17 +17,11 @@
  */
 package org.apache.phoenix.transaction;
 
-import java.io.IOException;
 import java.sql.SQLException;
-import java.util.concurrent.TimeoutException;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.slf4j.Logger;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
 
 public class OmidTransactionContext implements PhoenixTransactionContext {
 
@@ -56,7 +50,7 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
     }
 
     @Override
-    public void commitDDLFence(PTable dataTable, Logger logger) throws SQLException {
+    public void commitDDLFence(PTable dataTable) throws SQLException {
         // TODO Auto-generated method stub
 
     }
@@ -116,59 +110,24 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
     }
 
     @Override
-    public long getMaxTransactionsPerSecond() {
-        // TODO Auto-generated method stub
-        return 0;
-    }
-
-    @Override
-    public boolean isPreExistingVersion(long version) {
-        // TODO Auto-generated method stub
-        return false;
+    public Provider getProvider() {
+        return Provider.OMID;
     }
 
     @Override
-    public BaseRegionObserver getCoprocessor() {
-        // TODO Auto-generated method stub
+    public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext contex, boolean subTask) {
         return null;
     }
 
     @Override
-    public void setInMemoryTransactionClient(Configuration config) {
+    public void markDMLFence(PTable dataTable) {
         // TODO Auto-generated method stub
         
     }
 
     @Override
-    public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props,
-            ConnectionInfo connectionInfo) {
-        // TODO Auto-generated method stub
-        
-        return null;
-        
-    }
-
-    @Override
-    public byte[] getFamilyDeleteMarker() {
+    public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable) {
         // TODO Auto-generated method stub
         return null;
     }
-
-    @Override
-    public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void setupTxManager(Configuration config, String url) throws SQLException {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void tearDownTxManager() {
-        // TODO Auto-generated method stub
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/06e9c1f0/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
index b0c1bfe..c211661 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java
@@ -19,34 +19,28 @@ package org.apache.phoenix.transaction;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
 
-public class OmidTransactionProvider implements TransactionProvider {
+public class OmidTransactionProvider implements PhoenixTransactionProvider {
     private static final OmidTransactionProvider INSTANCE = new OmidTransactionProvider();
-    
+
     public static final OmidTransactionProvider getInstance() {
         return INSTANCE;
     }
-    
+
     private OmidTransactionProvider() {
     }
-    
-    @Override
-    public PhoenixTransactionContext getTransactionContext()  {
-        return new OmidTransactionContext();
-    }
 
     @Override
     public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException {
         //return new OmidTransactionContext(txnBytes);
         return null;
     }
-    
+
     @Override
     public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) {
         //return new OmidTransactionContext(connection);
@@ -54,25 +48,37 @@ public class OmidTransactionProvider implements TransactionProvider {
     }
 
     @Override
-    public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) {
-        //return new OmidTransactionContext(contex, connection, subTask);
+    public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) {
+        // TODO Auto-generated method stub
         return null;
     }
 
     @Override
-    public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) {
-        //return new OmidTransactionTable(ctx, htable);
+    public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo) {
+        // TODO Auto-generated method stub
         return null;
     }
-    
+
     @Override
-    public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) {
-        return CellUtil.createCell(row, family, HConstants.EMPTY_BYTE_ARRAY, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+    public Class<? extends RegionObserver> getCoprocessor() {
+        // TODO Auto-generated method stub
+        return null;
     }
-    
+
     @Override
-    public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) {
-        return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+    public Provider getProvider() {
+        return TransactionFactory.Provider.OMID;
     }
 
+    @Override
+    public boolean isUnsupported(Feature feature) {
+        // FIXME: if we initialize a Set with the unsupported features
+        // and check for containment, we run into a test failure
+        // in SetPropertyOnEncodedTableIT.testSpecifyingColumnFamilyForTTLFails()
+        // due to TableProperty.colFamSpecifiedException being null
+        // (though it's set in the constructor). I suspect some
+        // mysterious class loader issue. The below works fine
+        // as a workaround.
+        return (feature == Feature.ALTER_NONTX_TO_TX);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/06e9c1f0/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
deleted file mode 100644
index c67767c..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.transaction;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
-public class OmidTransactionTable implements PhoenixTransactionalTable {
-
-    public OmidTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) {
-        // TODO Auto-generated constructor stub
-    }
-
-    @Override
-    public Result get(Get get) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public void put(Put put) throws IOException {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void delete(Delete delete) throws IOException {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public ResultScanner getScanner(Scan scan) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public byte[] getTableName() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public Configuration getConfiguration() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public HTableDescriptor getTableDescriptor() throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public boolean exists(Get get) throws IOException {
-        // TODO Auto-generated method stub
-        return false;
-    }
-
-    @Override
-    public Result[] get(List<Get> gets) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public ResultScanner getScanner(byte[] family) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public ResultScanner getScanner(byte[] family, byte[] qualifier)
-            throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public void put(List<Put> puts) throws IOException {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public void delete(List<Delete> deletes) throws IOException {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public void setAutoFlush(boolean autoFlush) {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public boolean isAutoFlush() {
-        // TODO Auto-generated method stub
-        return false;
-    }
-
-    @Override
-    public long getWriteBufferSize() {
-        // TODO Auto-generated method stub
-        return 0;
-    }
-
-    @Override
-    public void setWriteBufferSize(long writeBufferSize) throws IOException {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public void flushCommits() throws IOException {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public void close() throws IOException {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public long incrementColumnValue(byte[] row, byte[] family,
-            byte[] qualifier, long amount, boolean writeToWAL)
-            throws IOException {
-        // TODO Auto-generated method stub
-        return 0;
-    }
-
-    @Override
-    public Boolean[] exists(List<Get> gets) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public void setAutoFlushTo(boolean autoFlush) {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public TableName getName() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public void batch(List<? extends Row> actions, Object[] results)
-            throws IOException, InterruptedException {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public Object[] batch(List<? extends Row> actions) throws IOException,
-            InterruptedException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public <R> void batchCallback(List<? extends Row> actions,
-            Object[] results, Callback<R> callback) throws IOException,
-            InterruptedException {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public <R> Object[] batchCallback(List<? extends Row> actions,
-            Callback<R> callback) throws IOException, InterruptedException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
-            byte[] value, Put put) throws IOException {
-        // TODO Auto-generated method stub
-        return false;
-    }
-
-    @Override
-    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
-            byte[] value, Delete delete) throws IOException {
-        // TODO Auto-generated method stub
-        return false;
-    }
-
-    @Override
-    public void mutateRow(RowMutations rm) throws IOException {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public Result append(Append append) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public Result increment(Increment increment) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public long incrementColumnValue(byte[] row, byte[] family,
-            byte[] qualifier, long amount) throws IOException {
-        // TODO Auto-generated method stub
-        return 0;
-    }
-
-    @Override
-    public long incrementColumnValue(byte[] row, byte[] family,
-            byte[] qualifier, long amount, Durability durability)
-            throws IOException {
-        // TODO Auto-generated method stub
-        return 0;
-    }
-
-    @Override
-    public CoprocessorRpcChannel coprocessorService(byte[] row) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public <T extends Service, R> Map<byte[], R> coprocessorService(
-            Class<T> service, byte[] startKey, byte[] endKey,
-            Call<T, R> callable) throws ServiceException, Throwable {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public <T extends Service, R> void coprocessorService(Class<T> service,
-            byte[] startKey, byte[] endKey, Call<T, R> callable,
-            Callback<R> callback) throws ServiceException, Throwable {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public <R extends Message> Map<byte[], R> batchCoprocessorService(
-            MethodDescriptor methodDescriptor, Message request,
-            byte[] startKey, byte[] endKey, R responsePrototype)
-            throws ServiceException, Throwable {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public <R extends Message> void batchCoprocessorService(
-            MethodDescriptor methodDescriptor, Message request,
-            byte[] startKey, byte[] endKey, R responsePrototype,
-            Callback<R> callback) throws ServiceException, Throwable {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
-            CompareOp compareOp, byte[] value, RowMutations mutation)
-            throws IOException {
-        // TODO Auto-generated method stub
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/06e9c1f0/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionClient.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionClient.java
new file mode 100644
index 0000000..f12f818
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionClient.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.Closeable;
+
+public interface PhoenixTransactionClient extends Closeable {
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/06e9c1f0/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
index 52ff2f9..f3ad42f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
@@ -17,20 +17,98 @@
  */
 package org.apache.phoenix.transaction;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.slf4j.Logger;
-
-import java.io.IOException;
 import java.sql.SQLException;
 import java.util.concurrent.TimeoutException;
 
-public interface PhoenixTransactionContext {
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
 
+public interface PhoenixTransactionContext {
+    public static PhoenixTransactionContext NULL_CONTEXT = new PhoenixTransactionContext() {
+
+        @Override
+        public void begin() throws SQLException {
+        }
+
+        @Override
+        public void commit() throws SQLException {
+        }
+
+        @Override
+        public void abort() throws SQLException {
+        }
+
+        @Override
+        public void checkpoint(boolean hasUncommittedData) throws SQLException {
+        }
+
+        @Override
+        public void commitDDLFence(PTable dataTable) throws SQLException {
+        }
+
+        @Override
+        public void join(PhoenixTransactionContext ctx) {
+        }
+
+        @Override
+        public boolean isTransactionRunning() {
+            return false;
+        }
+
+        @Override
+        public void reset() {
+        }
+
+        @Override
+        public long getTransactionId() {
+            return 0;
+        }
+
+        @Override
+        public long getReadPointer() {
+            return 0;
+        }
+
+        @Override
+        public long getWritePointer() {
+            return 0;
+        }
+
+        @Override
+        public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) {
+        }
+
+        @Override
+        public PhoenixVisibilityLevel getVisibilityLevel() {
+            return null;
+        }
+
+        @Override
+        public byte[] encodeTransaction() throws SQLException {
+            return null;
+        }
+
+        @Override
+        public Provider getProvider() {
+            return null;
+        }
+
+        @Override
+        public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext contex, boolean subTask) {
+            return NULL_CONTEXT;
+        }
+
+        @Override
+        public void markDMLFence(PTable dataTable) {
+            
+        }
+
+        @Override
+        public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable) {
+            return null;
+        }
+    };
     /**
      * 
      * Visibility levels needed for checkpointing and  
@@ -49,22 +127,6 @@ public interface PhoenixTransactionContext {
     public static final String READ_NON_TX_DATA = "data.tx.read.pre.existing";
 
     /**
-     * Set the in memory client connection to the transaction manager (for testing purpose)
-     *
-     * @param config
-     */
-    public void setInMemoryTransactionClient(Configuration config);
-
-    /**
-     * Set the client connection to the transaction manager
-     *
-     * @param config
-     * @param props
-     * @param connectionInfo
-     */
-    public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, ConnectionInfo connectionInfo);
-
-    /**
      * Starts a transaction
      *
      * @throws SQLException
@@ -84,7 +146,7 @@ public interface PhoenixTransactionContext {
      * @throws SQLException
      */
     public void abort() throws SQLException;
-
+    
     /**
      * Create a checkpoint in a transaction as defined in [TEPHRA-96]
      * @throws SQLException
@@ -100,9 +162,17 @@ public interface PhoenixTransactionContext {
      * @throws InterruptedException
      * @throws TimeoutException
      */
-    public void commitDDLFence(PTable dataTable, Logger logger)
+    public void commitDDLFence(PTable dataTable)
             throws SQLException;
 
+
+    /**
+     * Mark the start of DML go ensure that updates to indexed rows are not
+     * missed.
+     * @param dataTable the table on which DML command is working
+     */
+    public void markDMLFence(PTable dataTable);
+
     /**
      * Augment the current context with ctx modified keys
      *
@@ -121,7 +191,8 @@ public interface PhoenixTransactionContext {
     public void reset();
 
     /**
-     * Returns transaction unique identifier
+     * Returns transaction unique identifier which is also
+     * assumed to be the earliest write pointer.
      */
     public long getTransactionId();
 
@@ -150,42 +221,8 @@ public interface PhoenixTransactionContext {
      */
     public byte[] encodeTransaction() throws SQLException;
 
-    /**
-     * 
-     * @return max transactions per second
-     */
-    public long getMaxTransactionsPerSecond();
+    public Provider getProvider();
+    public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext contex, boolean subTask);
 
-    /**
-     *
-     * @param version
-     */
-    public boolean isPreExistingVersion(long version);
-
-    /**
-     *
-     * @return the coprocessor
-     */
-    public BaseRegionObserver getCoprocessor();
-
-    /**
-     * 
-     * @return the family delete marker
-     */
-    public byte[] getFamilyDeleteMarker();
-
-    /**
-     * Setup transaction manager's configuration for testing
-     */
-     public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException;
-
-    /**
-     * Setup transaction manager for testing
-     */
-    public void setupTxManager(Configuration config, String url) throws SQLException;
-
-    /**
-     * Tear down transaction manager for testing
-     */
-    public void tearDownTxManager();
+    public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable);
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/06e9c1f0/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
new file mode 100644
index 0000000..cdc6058
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+
+public interface PhoenixTransactionProvider {
+    public enum Feature {
+        ALTER_NONTX_TO_TX(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL);
+        
+        private final SQLExceptionCode code;
+        
+        Feature(SQLExceptionCode code) {
+            this.code = code;
+        }
+        
+        public SQLExceptionCode getCode() {
+            return code;
+        }
+    }
+    public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException;
+    public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection);
+    
+    public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo);
+    public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo);
+    public Class<? extends RegionObserver> getCoprocessor();
+    
+    public TransactionFactory.Provider getProvider();
+    public boolean isUnsupported(Feature feature);
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/06e9c1f0/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionService.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionService.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionService.java
new file mode 100644
index 0000000..10c46e1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionService.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.Closeable;
+
+public interface PhoenixTransactionService extends Closeable {
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/06e9c1f0/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
deleted file mode 100644
index 7af1c08..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.transaction;
-
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-
-import java.io.IOException;
-import java.util.List;
-
-public interface PhoenixTransactionalTable extends HTableInterface {
-
-    /**
-     * Transaction version of {@link HTableInterface#get(Get get)}
-     * @param get
-     * @throws IOException
-     */
-    public Result get(Get get) throws IOException;
-
-    /**
-     * Transactional version of {@link HTableInterface#put(Put put)}
-     * @param put
-     * @throws IOException
-     */
-    public void put(Put put) throws IOException;
-
-    /**
-     * Transactional version of {@link HTableInterface#delete(Delete delete)}
-     *
-     * @param delete
-     * @throws IOException
-     */
-    public void delete(Delete delete) throws IOException;
-
-    /**
-     * Transactional version of {@link HTableInterface#getScanner(Scan scan)}
-     *
-     * @param scan
-     * @return ResultScanner
-     * @throws IOException
-     */
-    public ResultScanner getScanner(Scan scan) throws IOException;
-
-    /**
-     * Returns Htable name
-     */
-    public byte[] getTableName();
-
-    /**
-     * Returns Htable configuration object
-     */
-    public Configuration getConfiguration();
-
-    /**
-     * Returns HTableDescriptor of Htable
-     * @throws IOException
-     */
-    public HTableDescriptor getTableDescriptor() throws IOException;
-
-    /**
-     * Checks if cell exists
-     * @throws IOException
-     */
-    public boolean exists(Get get) throws IOException;
-
-    /**
-     * Transactional version of {@link HTableInterface#get(List gets)}
-     * @throws IOException
-     */
-    public Result[] get(List<Get> gets) throws IOException;
-
-    /**
-     * Transactional version of {@link HTableInterface#getScanner(byte[] family)}
-     * @throws IOException
-     */
-    public ResultScanner getScanner(byte[] family) throws IOException;
-
-    /**
-     * Transactional version of {@link HTableInterface#getScanner(byte[] family, byte[] qualifier)}
-     * @throws IOException
-     */
-    public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException;
-
-    /**
-     * Transactional version of {@link HTableInterface#put(List puts)}
-     * @throws IOException
-     */
-    public void put(List<Put> puts) throws IOException;
-
-    /**
-     * Transactional version of {@link HTableInterface#delete(List deletes)}
-     * @throws IOException
-     */
-    public void delete(List<Delete> deletes) throws IOException;
-
-    /**
-     * Delegates to {@link HTable#setAutoFlush(boolean autoFlush)}
-     */
-    public void setAutoFlush(boolean autoFlush);
-
-    /**
-     * Delegates to {@link HTable#isAutoFlush()}
-     */
-    public boolean isAutoFlush();
-
-    /**
-     * Delegates to see HTable.getWriteBufferSize()
-     */
-    public long getWriteBufferSize();
-
-    /**
-     * Delegates to see HTable.setWriteBufferSize()
-     */
-    public void setWriteBufferSize(long writeBufferSize) throws IOException;
-
-    /**
-     * Delegates to see HTable.flushCommits()
-     */
-    public void flushCommits() throws IOException;
-
-    /**
-     * Releases resources
-     * @throws IOException
-     */
-    public void close() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/06e9c1f0/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
index 77c3ab6..8b16210 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -19,71 +19,46 @@ package org.apache.phoenix.transaction;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.transaction.TephraTransactionProvider.TephraTransactionClient;
 import org.apache.tephra.Transaction;
+import org.apache.tephra.Transaction.VisibilityLevel;
 import org.apache.tephra.TransactionAware;
 import org.apache.tephra.TransactionCodec;
 import org.apache.tephra.TransactionConflictException;
 import org.apache.tephra.TransactionContext;
 import org.apache.tephra.TransactionFailureException;
-import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionSystemClient;
-import org.apache.tephra.Transaction.VisibilityLevel;
 import org.apache.tephra.TxConstants;
-import org.apache.tephra.distributed.PooledClientProvider;
-import org.apache.tephra.distributed.TransactionServiceClient;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
-import org.apache.tephra.inmemory.InMemoryTxSystemClient;
-import org.apache.tephra.util.TxUtils;
+import org.apache.tephra.hbase.TransactionAwareHTable;
 import org.apache.tephra.visibility.FenceWait;
 import org.apache.tephra.visibility.VisibilityFence;
-import org.apache.tephra.zookeeper.TephraZKClientService;
-import org.apache.tephra.distributed.TransactionService;
-import org.apache.tephra.metrics.TxMetricsCollector;
-import org.apache.tephra.persist.HDFSTransactionStateStorage;
-import org.apache.tephra.snapshot.SnapshotCodecProvider;
-import org.apache.twill.discovery.DiscoveryService;
-import org.apache.twill.discovery.ZKDiscoveryService;
-import org.apache.twill.internal.utils.Networks;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
-import com.google.inject.util.Providers;
-
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class TephraTransactionContext implements PhoenixTransactionContext {
+import com.google.common.collect.Lists;
 
+public class TephraTransactionContext implements PhoenixTransactionContext {
+    private static final Logger logger = LoggerFactory.getLogger(TephraTransactionContext.class);
     private static final TransactionCodec CODEC = new TransactionCodec();
 
-    private static TransactionSystemClient txClient = null;
-    private static ZKClientService zkClient = null;
-    private static TransactionService txService = null;
-    private static TransactionManager txManager = null;
-
     private final List<TransactionAware> txAwares;
     private final TransactionContext txContext;
     private Transaction tx;
     private TransactionSystemClient txServiceClient;
-    private TransactionFailureException e;
 
     public TephraTransactionContext() {
         this.txServiceClient = null;
@@ -93,21 +68,21 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
 
     public TephraTransactionContext(byte[] txnBytes) throws IOException {
         this();
-        this.tx = (txnBytes != null && txnBytes.length > 0) ? CODEC
-                .decode(txnBytes) : null;
+        this.tx = CODEC.decode(txnBytes);
     }
 
     public TephraTransactionContext(PhoenixConnection connection) {
-        this.txServiceClient = txClient;  
+        PhoenixTransactionClient client = connection.getQueryServices().initTransactionClient(getProvider());  
+        assert (client instanceof TephraTransactionClient);
+        this.txServiceClient = ((TephraTransactionClient)client).getTransactionClient();
         this.txAwares = Collections.emptyList();
         this.txContext = new TransactionContext(txServiceClient);
     }
 
-    public TephraTransactionContext(PhoenixTransactionContext ctx,
-            PhoenixConnection connection, boolean subTask) {
-        this.txServiceClient = txClient;  
+    private TephraTransactionContext(PhoenixTransactionContext ctx, boolean subTask) {
         assert (ctx instanceof TephraTransactionContext);
         TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx;
+        this.txServiceClient = tephraTransactionContext.txServiceClient;  
 
         if (subTask) {
             this.tx = tephraTransactionContext.getTransaction();
@@ -117,42 +92,13 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
             this.txAwares = Collections.emptyList();
             this.txContext = tephraTransactionContext.getContext();
         }
-
-        this.e = null;
-    }
-
-    @Override
-    public void setInMemoryTransactionClient(Configuration config) {
-        TransactionManager txnManager = new TransactionManager(config);
-        txClient = this.txServiceClient = new InMemoryTxSystemClient(txnManager);
     }
 
     @Override
-    public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, ConnectionInfo connectionInfo) {
-        String zkQuorumServersString = props.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
-        if (zkQuorumServersString==null) {
-            zkQuorumServersString = connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort();
-        }
-
-        int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
-        // Create instance of the tephra zookeeper client
-        ZKClientService txZKClientService  = ZKClientServices.delegate(
-            ZKClients.reWatchOnExpire(
-                ZKClients.retryOnFailure(
-                     new TephraZKClientService(zkQuorumServersString, timeOut, null,
-                             ArrayListMultimap.<String, byte[]>create()), 
-                         RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
-                     )
-                );
-        txZKClientService.startAndWait();
-        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService);
-        PooledClientProvider pooledClientProvider = new PooledClientProvider(
-                config, zkDiscoveryService);
-        txClient = this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider);
-        
-        return txZKClientService;
+    public TransactionFactory.Provider getProvider() {
+        return TransactionFactory.Provider.TEPHRA;
     }
-
+    
     @Override
     public void begin() throws SQLException {
         if (txContext == null) {
@@ -181,8 +127,6 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
         try {
             txContext.finish();
         } catch (TransactionFailureException e) {
-            this.e = e;
-
             if (e instanceof TransactionConflictException) {
                 throw new SQLExceptionInfo.Builder(
                         SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION)
@@ -204,14 +148,8 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
         }
 
         try {
-            if (e != null) {
-                txContext.abort(e);
-                e = null;
-            } else {
-                txContext.abort();
-            }
+            txContext.abort();
         } catch (TransactionFailureException e) {
-            this.e = null;
             throw new SQLExceptionInfo.Builder(
                     SQLExceptionCode.TRANSACTION_FAILED)
                     .setMessage(e.getMessage()).setRootCause(e).build()
@@ -249,7 +187,7 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
     }
 
     @Override
-    public void commitDDLFence(PTable dataTable, Logger logger)
+    public void commitDDLFence(PTable dataTable)
             throws SQLException {
         byte[] key = dataTable.getName().getBytes();
 
@@ -276,7 +214,12 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
         }
     }
 
+    @Override
     public void markDMLFence(PTable table) {
+        if (table.getType() == PTableType.INDEX) {
+            return;
+        }
+        
         byte[] logicalKey = table.getName().getBytes();
         TransactionAware logicalTxAware = VisibilityFence.create(logicalKey);
 
@@ -300,6 +243,9 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
 
     @Override
     public void join(PhoenixTransactionContext ctx) {
+        if (ctx == PhoenixTransactionContext.NULL_CONTEXT) {
+            return;
+        }
         assert (ctx instanceof TephraTransactionContext);
         TephraTransactionContext tephraContext = (TephraTransactionContext) ctx;
 
@@ -325,7 +271,6 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
     public void reset() {
         tx = null;
         txAwares.clear();
-        this.e = null;
     }
 
     @Override
@@ -406,86 +351,15 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
         assert (tx != null);
 
         try {
-            return CODEC.encode(tx);
+            byte[] encodedTxBytes = CODEC.encode(tx);
+            encodedTxBytes = Arrays.copyOf(encodedTxBytes, encodedTxBytes.length + 1);
+            encodedTxBytes[encodedTxBytes.length - 1] = getProvider().getCode();
+            return encodedTxBytes;
         } catch (IOException e) {
             throw new SQLException(e);
         }
     }
 
-    @Override
-    public long getMaxTransactionsPerSecond() {
-        return TxConstants.MAX_TX_PER_MS;
-    }
-
-    @Override
-    public boolean isPreExistingVersion(long version) {
-        return TxUtils.isPreExistingVersion(version);
-    }
-
-    @Override
-    public BaseRegionObserver getCoprocessor() {
-        return new TransactionProcessor();
-    }
-
-    @Override
-    public byte[] getFamilyDeleteMarker() {
-        return TxConstants.FAMILY_DELETE_QUALIFIER;
-    }
-
-    @Override
-    public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException {
-        config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
-        config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
-        config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
-        config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort());
-        config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder);
-        config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, defaultTxnTimeoutSeconds);
-        config.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
-        config.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L);
-    }
-
-    @Override
-    public void setupTxManager(Configuration config, String url) throws SQLException {
-
-        if (txService != null) {
-            return;
-        }
-
-        ConnectionInfo connInfo = ConnectionInfo.create(url);
-        zkClient = ZKClientServices.delegate(
-          ZKClients.reWatchOnExpire(
-            ZKClients.retryOnFailure(
-              ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
-                .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
-                        HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
-                .build(),
-              RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
-            )
-          )
-        );
-        zkClient.startAndWait();
-
-        DiscoveryService discovery = new ZKDiscoveryService(zkClient);
-        txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector());
-        txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager));
-        txService.startAndWait();
-    }
-
-    @Override
-    public void tearDownTxManager() {
-        try {
-            if (txService != null) txService.stopAndWait();
-        } finally {
-            try {
-                if (zkClient != null) zkClient.stopAndWait();
-            } finally {
-                txService = null;
-                zkClient = null;
-                txManager = null;
-            }
-        }
-    }
-
     /**
      * TephraTransactionContext specific functions
      */
@@ -511,4 +385,17 @@ public class TephraTransactionContext implements PhoenixTransactionContext {
             txAware.startTx(tx);
         }
     }
+
+    @Override
+    public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext context, boolean subTask) {
+        return new TephraTransactionContext(context, subTask);
+    }
+    
+    @Override
+    public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable) {
+        TransactionAwareHTable transactionAwareHTable = new TransactionAwareHTable(htable, isImmutable ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
+        this.addTransactionAware(transactionAwareHTable);
+        return transactionAwareHTable;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/06e9c1f0/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
index 795be9f..2e52efa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
@@ -18,16 +18,37 @@
 package org.apache.phoenix.transaction;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.distributed.PooledClientProvider;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.distributed.TransactionServiceClient;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.tephra.zookeeper.TephraZKClientService;
+import org.apache.twill.discovery.DiscoveryService;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
 
-public class TephraTransactionProvider implements TransactionProvider {
+import com.google.common.collect.ArrayListMultimap;
+import com.google.inject.util.Providers;
+
+public class TephraTransactionProvider implements PhoenixTransactionProvider {
     private static final TephraTransactionProvider INSTANCE = new TephraTransactionProvider();
     
     public static final TephraTransactionProvider getInstance() {
@@ -37,12 +58,6 @@ public class TephraTransactionProvider implements TransactionProvider {
     private TephraTransactionProvider() {
     }
     
-    
-    @Override
-    public PhoenixTransactionContext getTransactionContext()  {
-        return new TephraTransactionContext();
-    }
-
     @Override
     public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException {
        return new TephraTransactionContext(txnBytes);
@@ -54,23 +69,129 @@ public class TephraTransactionProvider implements TransactionProvider {
     }
 
     @Override
-    public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) {
-        return new TephraTransactionContext(contex, connection, subTask);
+    public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) {
+        if (connectionInfo.isConnectionless()) {
+            TransactionManager txnManager = new TransactionManager(config);
+            TransactionSystemClient txClient = new InMemoryTxSystemClient(txnManager);
+            return new TephraTransactionClient(txClient);
+            
+        }
+        String zkQuorumServersString = config.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
+        if (zkQuorumServersString==null) {
+            zkQuorumServersString = connectionInfo.getZookeeperConnectionString();
+        }
+
+        int timeOut = config.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
+        // Create instance of the tephra zookeeper client
+        ZKClientService zkClientService  = ZKClientServices.delegate(
+            ZKClients.reWatchOnExpire(
+                ZKClients.retryOnFailure(
+                     new TephraZKClientService(zkQuorumServersString, timeOut, null,
+                             ArrayListMultimap.<String, byte[]>create()), 
+                         RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
+                     )
+                );
+        //txZKClientService.startAndWait();
+        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService);
+        PooledClientProvider pooledClientProvider = new PooledClientProvider(
+                config, zkDiscoveryService);
+        TransactionServiceClient txClient = new TransactionServiceClient(config,pooledClientProvider);
+        TephraTransactionClient client = new TephraTransactionClient(zkClientService, txClient);
+        client.start();
+        
+        return client;
     }
 
     @Override
-    public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) {
-        return new TephraTransactionTable(ctx, htable);
+    public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connInfo) {
+        ZKClientService zkClient = ZKClientServices.delegate(
+          ZKClients.reWatchOnExpire(
+            ZKClients.retryOnFailure(
+              ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
+                .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
+                        HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
+                .build(),
+              RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
+            )
+          )
+        );
+
+        //zkClient.startAndWait();
+        DiscoveryService discovery = new ZKDiscoveryService(zkClient);
+        TransactionManager txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector());
+        TransactionService txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager));
+        TephraTransactionService service = new TephraTransactionService(zkClient, txService);
+        //txService.startAndWait();            
+        service.start();
+        return service;
+    }
+
+    static class TephraTransactionService implements PhoenixTransactionService {
+        private final ZKClientService zkClient;
+        private final TransactionService txService;
+
+        public TephraTransactionService(ZKClientService zkClient, TransactionService txService) {
+            this.zkClient = zkClient;
+            this.txService = txService;
+        }
+        
+        public void start() {
+            zkClient.startAndWait();
+            txService.startAndWait();            
+        }
+        
+        @Override
+        public void close() throws IOException {
+            try {
+                if (txService != null) txService.stopAndWait();
+            } finally {
+                if (zkClient != null) zkClient.stopAndWait();
+            }
+        }
+        
     }
     
-    @Override
-    public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) {
-        return CellUtil.createCell(row, family, TxConstants.FAMILY_DELETE_QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+    static class TephraTransactionClient implements PhoenixTransactionClient {
+        private final ZKClientService zkClient;
+        private final TransactionSystemClient txClient;
+
+        public TephraTransactionClient(TransactionSystemClient txClient) {
+            this(null, txClient);
+        }
+        
+        public TephraTransactionClient(ZKClientService zkClient, TransactionSystemClient txClient) {
+            this.zkClient = zkClient;
+            this.txClient = txClient;
+        }
+        
+        public void start() {
+            zkClient.startAndWait();
+        }
+        
+        public TransactionSystemClient getTransactionClient() {
+            return txClient;
+        }
+        
+        @Override
+        public void close() throws IOException {
+            zkClient.stopAndWait();
+        }
+        
     }
     
     @Override
-    public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) {
-        return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+    public Class<? extends RegionObserver> getCoprocessor() {
+        return TephraTransactionalProcessor.class;
+    }
+
+    @Override
+    public Provider getProvider() {
+        return TransactionFactory.Provider.TEPHRA;
+    }
+
+    @Override
+    public boolean isUnsupported(Feature feature) {
+        return false;
     }
 
 }


Mime
View raw message