phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdsi...@apache.org
Subject [2/3] phoenix git commit: Connect TAL to Phoenix
Date Mon, 26 Jun 2017 19:10:13 GMT
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 16f6e39..09545da 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -221,6 +221,8 @@ import org.apache.phoenix.schema.types.PTinyint;
 import org.apache.phoenix.schema.types.PUnsignedTinyint;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.ConfigUtil;
@@ -235,11 +237,6 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.UpgradeUtil;
-import org.apache.tephra.TransactionSystemClient;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.distributed.PooledClientProvider;
-import org.apache.tephra.distributed.TransactionServiceClient;
-import org.apache.tephra.zookeeper.TephraZKClientService;
 import org.apache.twill.discovery.ZKDiscoveryService;
 import org.apache.twill.zookeeper.RetryStrategies;
 import org.apache.twill.zookeeper.ZKClientService;
@@ -291,7 +288,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     private HConnection connection;
     private ZKClientService txZKClientService;
-    private TransactionServiceClient txServiceClient;
     private volatile boolean initialized;
     private volatile int nSequenceSaltBuckets;
 
@@ -400,32 +396,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
 
     }
 
-    @Override
-    public TransactionSystemClient getTransactionSystemClient() {
-        return txServiceClient;
-    }
-
     private void initTxServiceClient() {
-        String zkQuorumServersString = this.getProps().get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
-        if (zkQuorumServersString==null) {
-            zkQuorumServersString = connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort();
-        }
-
-        int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
-        // Create instance of the tephra zookeeper client
-        txZKClientService = ZKClientServices.delegate(
-            ZKClients.reWatchOnExpire(
-                ZKClients.retryOnFailure(
-                     new TephraZKClientService(zkQuorumServersString, timeOut, null,
-                             ArrayListMultimap.<String, byte[]>create()), 
-                         RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
-                     )
-                );
-        txZKClientService.startAndWait();
-        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService);
-        PooledClientProvider pooledClientProvider = new PooledClientProvider(
-                config, zkDiscoveryService);
-        this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider);
+        txZKClientService = TransactionFactory.getTransactionFactory().getTransactionContext().setTransactionClient(config, props, connectionInfo);
     }
 
     private void openConnection() throws SQLException {
@@ -868,7 +840,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             }
             boolean isTransactional =
                     Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) ||
-                    Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA)); // For ALTER TABLE
+                    Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA)); // For ALTER TABLE
             // TODO: better encapsulation for this
             // Since indexes can't have indexes, don't install our indexing coprocessor for indexes.
             // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use
@@ -1130,7 +1102,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 // If mapping an existing table as transactional, set property so that existing
                 // data is correctly read.
                 if (willBeTx) {
-                    newDesc.setValue(TxConstants.READ_NON_TX_DATA, Boolean.TRUE.toString());
+                    newDesc.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, Boolean.TRUE.toString());
                 } else {
                     // If we think we're creating a non transactional table when it's already
                     // transactional, don't allow.
@@ -1139,7 +1111,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         .setSchemaName(SchemaUtil.getSchemaNameFromFullName(tableName))
                         .setTableName(SchemaUtil.getTableNameFromFullName(tableName)).build().buildException();
                     }
-                    newDesc.remove(TxConstants.READ_NON_TX_DATA);
+                    newDesc.remove(PhoenixTransactionContext.READ_NON_TX_DATA);
                 }
                 if (existingDesc.equals(newDesc)) {
                     return null; // Indicate that no metadata was changed
@@ -1759,7 +1731,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             origTableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size());
             tableDescriptors.add(tableDescriptor);
             origTableDescriptors.add(origTableDescriptor);
-            nonTxToTx = Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA));
+            nonTxToTx = Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA));
             /*
              * If the table was transitioned from non transactional to transactional, we need
              * to also transition the index tables.
@@ -1869,7 +1841,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 indexTableProps = Collections.<String,Object>emptyMap();
             } else {
                 indexTableProps = Maps.newHashMapWithExpectedSize(1);
-                indexTableProps.put(TxConstants.READ_NON_TX_DATA, Boolean.valueOf(txValue));
+                indexTableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, Boolean.valueOf(txValue));
             }
             for (PTable index : table.getIndexes()) {
                 HTableDescriptor indexDescriptor = admin.getTableDescriptor(index.getPhysicalName().getBytes());
@@ -1882,7 +1854,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(indexFamilyName);
                     HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(dataFamilyName);
                     indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
-                    indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
+                    indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL));
                 } else {
                     for (PColumnFamily family : index.getColumnFamilies()) {
                         byte[] familyName = family.getName().getBytes();
@@ -1890,7 +1862,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName);
                         HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName);
                         indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
-                        indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
+                        indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL));
                     }
                 }
                 setTransactional(indexDescriptor, index.getType(), txValue, indexTableProps);
@@ -1926,7 +1898,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName);
             HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName);
             indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
-            indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
+            indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL));
         } else {
             for (PColumnFamily family : table.getColumnFamilies()) {
                 byte[] familyName = family.getName().getBytes();
@@ -1934,7 +1906,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 if (indexColDescriptor != null) {
                     HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName);
                     indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions());
-                    indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL));
+                    indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL));
                 }
             }
         }
@@ -1962,9 +1934,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
     }
     private void setTransactional(HTableDescriptor tableDescriptor, PTableType tableType, String txValue, Map<String, Object> tableProps) throws SQLException {
         if (txValue == null) {
-            tableDescriptor.remove(TxConstants.READ_NON_TX_DATA);
+            tableDescriptor.remove(PhoenixTransactionContext.READ_NON_TX_DATA);
         } else {
-            tableDescriptor.setValue(TxConstants.READ_NON_TX_DATA, txValue);
+            tableDescriptor.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, txValue);
         }
         this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, tableType, tableProps);
     }
@@ -2010,7 +1982,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                 commonFamilyProps.put(propName, prop.getSecond());
                             } else if (propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && Boolean.TRUE.equals(propValue)) {
                                 willBeTransactional = isOrWillBeTransactional = true;
-                                tableProps.put(TxConstants.READ_NON_TX_DATA, propValue);
+                                tableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, propValue);
                             }
                         } else {
                             if (MetaDataUtil.isHColumnProperty(propName)) {
@@ -2179,10 +2151,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         if (props == null) {
                             props = new HashMap<String, Object>();
                         }
-                        props.put(TxConstants.PROPERTY_TTL, ttl);
+                        props.put(PhoenixTransactionContext.PROPERTY_TTL, ttl);
                         // Remove HBase TTL if we're not transitioning an existing table to become transactional
                         // or if the existing transactional table wasn't originally non transactional.
-                        if (!willBeTransactional && !Boolean.valueOf(newTableDescriptor.getValue(TxConstants.READ_NON_TX_DATA))) {
+                        if (!willBeTransactional && !Boolean.valueOf(newTableDescriptor.getValue(PhoenixTransactionContext.READ_NON_TX_DATA))) {
                             props.remove(TTL);
                         }
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index da394c0..ec05b24 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -80,15 +80,13 @@ import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.JDBCUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SequenceUtil;
-import org.apache.tephra.TransactionManager;
-import org.apache.tephra.TransactionSystemClient;
-import org.apache.tephra.inmemory.InMemoryTxSystemClient;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -107,7 +105,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
     private PMetaData metaData;
     private final Map<SequenceKey, SequenceInfo> sequenceMap = Maps.newHashMap();
     private final String userName;
-    private final TransactionSystemClient txSystemClient;
     private KeyValueBuilder kvBuilder;
     private volatile boolean initialized;
     private volatile SQLException initializationException;
@@ -119,7 +116,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
         super(services);
         userName = connInfo.getPrincipal();
         metaData = newEmptyMetaData();
-        
+
         // Use KeyValueBuilder that builds real KeyValues, as our test utils require this
         this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
         Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
@@ -138,8 +135,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
         // Without making a copy of the configuration we cons up, we lose some of our properties
         // on the server side during testing.
         this.config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config);
-        TransactionManager txnManager = new TransactionManager(config);
-        this.txSystemClient = new InMemoryTxSystemClient(txnManager);
+        TransactionFactory.getTransactionFactory().getTransactionContext().setInMemoryTransactionClient(config);
         this.guidePostsCache = new GuidePostsCache(this, config);
     }
 
@@ -531,11 +527,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
                 QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
     }
 
-    @Override
-    public TransactionSystemClient getTransactionSystemClient() {
-        return txSystemClient;
-    }
- 
     public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary)
             throws SQLException {
         return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, 0l, null);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 7f7c027..6c464eb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -47,7 +47,6 @@ import org.apache.phoenix.schema.SequenceAllocation;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
-import org.apache.tephra.TransactionSystemClient;
 
 
 public class DelegateConnectionQueryServices extends DelegateQueryServices implements ConnectionQueryServices {
@@ -257,11 +256,6 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple
     }
 
     @Override
-    public TransactionSystemClient getTransactionSystemClient() {
-        return getDelegate().getTransactionSystemClient();
-    }
-
-    @Override
     public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary)
             throws SQLException {
         return getDelegate().createFunction(functionData, function, temporary);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 537d31b..c982e26 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -217,6 +217,7 @@ import org.apache.phoenix.schema.types.PTimestamp;
 import org.apache.phoenix.schema.types.PUnsignedLong;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.CursorUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -231,7 +232,6 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.apache.phoenix.util.TransactionUtil;
 import org.apache.phoenix.util.UpgradeUtil;
-import org.apache.tephra.TxConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1989,7 +1989,7 @@ public class MetaDataClient {
                 // If TTL set, use Tephra TTL property name instead
                 Object ttl = commonFamilyProps.remove(HColumnDescriptor.TTL);
                 if (ttl != null) {
-                    commonFamilyProps.put(TxConstants.PROPERTY_TTL, ttl);
+                    commonFamilyProps.put(PhoenixTransactionContext.PROPERTY_TTL, ttl);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index b0d0e25..20e8611 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -69,13 +69,13 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SizedUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
-import org.apache.tephra.TxConstants;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
@@ -1033,11 +1033,11 @@ public class PTableImpl implements PTable {
             if (PTableImpl.this.isTransactional()) {
                 Put put = new Put(key);
                 if (families.isEmpty()) {
-                    put.add(SchemaUtil.getEmptyColumnFamily(PTableImpl.this), TxConstants.FAMILY_DELETE_QUALIFIER, ts,
+                    put.add(SchemaUtil.getEmptyColumnFamily(PTableImpl.this), TransactionFactory.getTransactionFactory().getTransactionContext().getFamilyDeleteMarker(), ts,
                             HConstants.EMPTY_BYTE_ARRAY);
                 } else {
                     for (PColumnFamily colFamily : families) {
-                        put.add(colFamily.getName().getBytes(), TxConstants.FAMILY_DELETE_QUALIFIER, ts,
+                        put.add(colFamily.getName().getBytes(), TransactionFactory.getTransactionFactory().getTransactionContext().getFamilyDeleteMarker(), ts,
                                 HConstants.EMPTY_BYTE_ARRAY);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
new file mode 100644
index 0000000..d4553ec
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.slf4j.Logger;
+
+public class OmidTransactionContext implements PhoenixTransactionContext {
+
+    @Override
+    public void begin() throws SQLException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void commit() throws SQLException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void abort() throws SQLException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void checkpoint(boolean hasUncommittedData) throws SQLException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void commitDDLFence(PTable dataTable, Logger logger) throws SQLException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void join(PhoenixTransactionContext ctx) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public boolean isTransactionRunning() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public void reset() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public long getTransactionId() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public long getReadPointer() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public long getWritePointer() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public PhoenixVisibilityLevel getVisibilityLevel() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public byte[] encodeTransaction() throws SQLException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public long getMaxTransactionsPerSecond() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public boolean isPreExistingVersion(long version) {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public BaseRegionObserver getCoProcessor() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void setInMemoryTransactionClient(Configuration config) {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props,
+            ConnectionInfo connectionInfo) {
+        // TODO Auto-generated method stub
+        
+        return null;
+        
+    }
+
+    @Override
+    public byte[] getFamilyDeleteMarker() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void setupTxManager(Configuration config, String url) throws SQLException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void tearDownTxManager() {
+        // TODO Auto-generated method stub
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
new file mode 100644
index 0000000..54eea8b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
+public class OmidTransactionTable implements PhoenixTransactionalTable {
+
+    public OmidTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) {
+        // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public Result get(Get get) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void put(Put put) throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void delete(Delete delete) throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public ResultScanner getScanner(Scan scan) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public byte[] getTableName() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public HTableDescriptor getTableDescriptor() throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public boolean exists(Get get) throws IOException {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public Result[] get(List<Get> gets) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ResultScanner getScanner(byte[] family) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ResultScanner getScanner(byte[] family, byte[] qualifier)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void put(List<Put> puts) throws IOException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void delete(List<Delete> deletes) throws IOException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void setAutoFlush(boolean autoFlush) {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public boolean isAutoFlush() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public long getWriteBufferSize() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public void setWriteBufferSize(long writeBufferSize) throws IOException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void flushCommits() throws IOException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void close() throws IOException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family,
+            byte[] qualifier, long amount, boolean writeToWAL)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public Boolean[] exists(List<Get> gets) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void setAutoFlushTo(boolean autoFlush) {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public TableName getName() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public boolean[] existsAll(List<Get> gets) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void batch(List<? extends Row> actions, Object[] results)
+            throws IOException, InterruptedException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public Object[] batch(List<? extends Row> actions) throws IOException,
+            InterruptedException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public <R> void batchCallback(List<? extends Row> actions,
+            Object[] results, Callback<R> callback) throws IOException,
+            InterruptedException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public <R> Object[] batchCallback(List<? extends Row> actions,
+            Callback<R> callback) throws IOException, InterruptedException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+            byte[] value, Put put) throws IOException {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+            CompareOp compareOp, byte[] value, Put put) throws IOException {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+            byte[] value, Delete delete) throws IOException {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+            CompareOp compareOp, byte[] value, Delete delete)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public void mutateRow(RowMutations rm) throws IOException {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public Result append(Append append) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Result increment(Increment increment) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family,
+            byte[] qualifier, long amount) throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family,
+            byte[] qualifier, long amount, Durability durability)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public CoprocessorRpcChannel coprocessorService(byte[] row) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public <T extends Service, R> Map<byte[], R> coprocessorService(
+            Class<T> service, byte[] startKey, byte[] endKey,
+            Call<T, R> callable) throws ServiceException, Throwable {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public <T extends Service, R> void coprocessorService(Class<T> service,
+            byte[] startKey, byte[] endKey, Call<T, R> callable,
+            Callback<R> callback) throws ServiceException, Throwable {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public <R extends Message> Map<byte[], R> batchCoprocessorService(
+            MethodDescriptor methodDescriptor, Message request,
+            byte[] startKey, byte[] endKey, R responsePrototype)
+            throws ServiceException, Throwable {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public <R extends Message> void batchCoprocessorService(
+            MethodDescriptor methodDescriptor, Message request,
+            byte[] startKey, byte[] endKey, R responsePrototype,
+            Callback<R> callback) throws ServiceException, Throwable {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
+            CompareOp compareOp, byte[] value, RowMutations mutation)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
new file mode 100644
index 0000000..d335692
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.concurrent.TimeoutException;
+
+public interface PhoenixTransactionContext {
+
+    /**
+     * 
+     * Visibility levels needed for checkpointing and  
+     *
+     */
+    public enum PhoenixVisibilityLevel {
+        SNAPSHOT,
+        SNAPSHOT_EXCLUDE_CURRENT,
+        SNAPSHOT_ALL
+      }
+
+    public static final String TX_ROLLBACK_ATTRIBUTE_KEY = "tephra.tx.rollback"; //"phoenix.tx.rollback"; 
+
+    public static final String PROPERTY_TTL = "dataset.table.ttl";
+
+    public static final String READ_NON_TX_DATA = "data.tx.read.pre.existing";
+
+    /**
+     * Set the in memory client connection to the transaction manager (for testing purpose)
+     *
+     * @param config
+     */
+    public void setInMemoryTransactionClient(Configuration config);
+
+    /**
+     * Set the client connection to the transaction manager
+     *
+     * @param config
+     * @param props
+     * @param connectionInfo
+     */
+    public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, ConnectionInfo connectionInfo);
+
+    /**
+     * Starts a transaction
+     *
+     * @throws SQLException
+     */
+    public void begin() throws SQLException;
+
+    /**
+     * Commits a transaction
+     *
+     * @throws SQLException
+     */
+    public void commit() throws SQLException;
+
+    /**
+     * Rollback a transaction
+     *
+     * @throws SQLException
+     */
+    public void abort() throws SQLException;
+
+    /**
+     * Create a checkpoint in a transaction as defined in [TEPHRA-96]
+     * @throws SQLException
+     */
+    public void checkpoint(boolean hasUncommittedData) throws SQLException;
+
+    /**
+     * Commit DDL to guarantee that no transaction started before create index
+     * and committed afterwards, as explained in [PHOENIX-2478], [TEPHRA-157] and [OMID-56].
+     *
+     * @param dataTable  the table that the DDL command works on
+     * @throws SQLException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    public void commitDDLFence(PTable dataTable, Logger logger)
+            throws SQLException;
+
+    /**
+     * Augment the current context with ctx modified keys
+     *
+     * @param ctx
+     */
+    public void join(PhoenixTransactionContext ctx);
+
+    /**
+     * Is there a transaction in flight?
+     */
+    public boolean isTransactionRunning();
+
+    /**
+     * Reset transaction state
+     */
+    public void reset();
+
+    /**
+     * Returns transaction unique identifier
+     */
+    public long getTransactionId();
+
+    /**
+     * Returns transaction snapshot id
+     */
+    public long getReadPointer();
+
+    /**
+     * Returns transaction write pointer. After checkpoint the write pointer is different than the initial one  
+     */
+    public long getWritePointer();
+
+    /**
+     * Set visibility level
+     */
+    public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel);
+
+    /**
+     * Returns visibility level
+     */
+    public PhoenixVisibilityLevel getVisibilityLevel();
+
+    /**
+     * Encode transaction
+     */
+    public byte[] encodeTransaction() throws SQLException;
+
+    /**
+     * 
+     * @return max transactions per second
+     */
+    public long getMaxTransactionsPerSecond();
+
+    /**
+     *
+     * @param version
+     */
+    public boolean isPreExistingVersion(long version);
+
+    /**
+     *
+     * @return the coprocessor
+     */
+    public BaseRegionObserver getCoProcessor();
+
+    /**
+     * 
+     * @return the family delete marker
+     */
+    public byte[] getFamilyDeleteMarker();
+
+    /**
+     * Setup transaction manager's configuration for testing
+     */
+     public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException;
+
+    /**
+     * Setup transaction manager for testing
+     */
+    public void setupTxManager(Configuration config, String url) throws SQLException;
+
+    /**
+     * Tear down transaction manager for testing
+     */
+    public void tearDownTxManager();
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
new file mode 100644
index 0000000..7af1c08
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+
+import java.io.IOException;
+import java.util.List;
+
+public interface PhoenixTransactionalTable extends HTableInterface {
+
+    /**
+     * Transaction version of {@link HTableInterface#get(Get get)}
+     * @param get
+     * @throws IOException
+     */
+    public Result get(Get get) throws IOException;
+
+    /**
+     * Transactional version of {@link HTableInterface#put(Put put)}
+     * @param put
+     * @throws IOException
+     */
+    public void put(Put put) throws IOException;
+
+    /**
+     * Transactional version of {@link HTableInterface#delete(Delete delete)}
+     *
+     * @param delete
+     * @throws IOException
+     */
+    public void delete(Delete delete) throws IOException;
+
+    /**
+     * Transactional version of {@link HTableInterface#getScanner(Scan scan)}
+     *
+     * @param scan
+     * @return ResultScanner
+     * @throws IOException
+     */
+    public ResultScanner getScanner(Scan scan) throws IOException;
+
+    /**
+     * Returns Htable name
+     */
+    public byte[] getTableName();
+
+    /**
+     * Returns Htable configuration object
+     */
+    public Configuration getConfiguration();
+
+    /**
+     * Returns HTableDescriptor of Htable
+     * @throws IOException
+     */
+    public HTableDescriptor getTableDescriptor() throws IOException;
+
+    /**
+     * Checks if cell exists
+     * @throws IOException
+     */
+    public boolean exists(Get get) throws IOException;
+
+    /**
+     * Transactional version of {@link HTableInterface#get(List gets)}
+     * @throws IOException
+     */
+    public Result[] get(List<Get> gets) throws IOException;
+
+    /**
+     * Transactional version of {@link HTableInterface#getScanner(byte[] family)}
+     * @throws IOException
+     */
+    public ResultScanner getScanner(byte[] family) throws IOException;
+
+    /**
+     * Transactional version of {@link HTableInterface#getScanner(byte[] family, byte[] qualifier)}
+     * @throws IOException
+     */
+    public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException;
+
+    /**
+     * Transactional version of {@link HTableInterface#put(List puts)}
+     * @throws IOException
+     */
+    public void put(List<Put> puts) throws IOException;
+
+    /**
+     * Transactional version of {@link HTableInterface#delete(List deletes)}
+     * @throws IOException
+     */
+    public void delete(List<Delete> deletes) throws IOException;
+
+    /**
+     * Delegates to {@link HTable#setAutoFlush(boolean autoFlush)}
+     */
+    public void setAutoFlush(boolean autoFlush);
+
+    /**
+     * Delegates to {@link HTable#isAutoFlush()}
+     */
+    public boolean isAutoFlush();
+
+    /**
+     * Delegates to see HTable.getWriteBufferSize()
+     */
+    public long getWriteBufferSize();
+
+    /**
+     * Delegates to see HTable.setWriteBufferSize()
+     */
+    public void setWriteBufferSize(long writeBufferSize) throws IOException;
+
+    /**
+     * Delegates to see HTable.flushCommits()
+     */
+    public void flushCommits() throws IOException;
+
+    /**
+     * Releases resources
+     * @throws IOException
+     */
+    public void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
new file mode 100644
index 0000000..83e7f99
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionAware;
+import org.apache.tephra.TransactionCodec;
+import org.apache.tephra.TransactionConflictException;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.Transaction.VisibilityLevel;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.distributed.PooledClientProvider;
+import org.apache.tephra.distributed.TransactionServiceClient;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.util.TxUtils;
+import org.apache.tephra.visibility.FenceWait;
+import org.apache.tephra.visibility.VisibilityFence;
+import org.apache.tephra.zookeeper.TephraZKClientService;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.twill.discovery.DiscoveryService;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.internal.utils.Networks;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import com.google.inject.util.Providers;
+
+import org.slf4j.Logger;
+
+public class TephraTransactionContext implements PhoenixTransactionContext {
+
+    private static final TransactionCodec CODEC = new TransactionCodec();
+
+    private static TransactionSystemClient txClient = null;
+    private static ZKClientService zkClient = null;
+    private static TransactionService txService = null;
+    private static TransactionManager txManager = null;
+
+    private final List<TransactionAware> txAwares;
+    private final TransactionContext txContext;
+    private Transaction tx;
+    private TransactionSystemClient txServiceClient;
+    private TransactionFailureException e;
+
+    public TephraTransactionContext() {
+        this.txServiceClient = null;
+        this.txAwares = Lists.newArrayList();
+        this.txContext = null;
+    }
+
+    public TephraTransactionContext(byte[] txnBytes) throws IOException {
+        this();
+        this.tx = (txnBytes != null && txnBytes.length > 0) ? CODEC
+                .decode(txnBytes) : null;
+    }
+
+    public TephraTransactionContext(PhoenixConnection connection) {
+        this.txServiceClient = txClient;  
+        this.txAwares = Collections.emptyList();
+        this.txContext = new TransactionContext(txServiceClient);
+    }
+
+    public TephraTransactionContext(PhoenixTransactionContext ctx,
+            PhoenixConnection connection, boolean subTask) {
+        this.txServiceClient = txClient;  
+        assert (ctx instanceof TephraTransactionContext);
+        TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx;
+
+        if (subTask) {
+            this.tx = tephraTransactionContext.getTransaction();
+            this.txAwares = Lists.newArrayList();
+            this.txContext = null;
+        } else {
+            this.txAwares = Collections.emptyList();
+            this.txContext = tephraTransactionContext.getContext();
+        }
+
+        this.e = null;
+    }
+
+    @Override
+    public void setInMemoryTransactionClient(Configuration config) {
+        TransactionManager txnManager = new TransactionManager(config);
+        txClient = this.txServiceClient = new InMemoryTxSystemClient(txnManager);
+    }
+
+    @Override
+    public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, ConnectionInfo connectionInfo) {
+        String zkQuorumServersString = props.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
+        if (zkQuorumServersString==null) {
+            zkQuorumServersString = connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort();
+        }
+
+        int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
+        // Create instance of the tephra zookeeper client
+        ZKClientService txZKClientService  = ZKClientServices.delegate(
+            ZKClients.reWatchOnExpire(
+                ZKClients.retryOnFailure(
+                     new TephraZKClientService(zkQuorumServersString, timeOut, null,
+                             ArrayListMultimap.<String, byte[]>create()), 
+                         RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS))
+                     )
+                );
+        txZKClientService.startAndWait();
+        ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService);
+        PooledClientProvider pooledClientProvider = new PooledClientProvider(
+                config, zkDiscoveryService);
+        txClient = this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider);
+        
+        return txZKClientService;
+    }
+
+    @Override
+    public void begin() throws SQLException {
+        if (txContext == null) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build()
+                    .buildException();
+        }
+
+        try {
+            txContext.start();
+        } catch (TransactionFailureException e) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TRANSACTION_FAILED)
+                    .setMessage(e.getMessage()).setRootCause(e).build()
+                    .buildException();
+        }
+    }
+
+    @Override
+    public void commit() throws SQLException {
+
+        if (txContext == null || !isTransactionRunning()) {
+            return;
+        }
+
+        try {
+            txContext.finish();
+        } catch (TransactionFailureException e) {
+            this.e = e;
+
+            if (e instanceof TransactionConflictException) {
+                throw new SQLExceptionInfo.Builder(
+                        SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION)
+                        .setMessage(e.getMessage()).setRootCause(e).build()
+                        .buildException();
+            }
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TRANSACTION_FAILED)
+                    .setMessage(e.getMessage()).setRootCause(e).build()
+                    .buildException();
+        }
+    }
+
+    @Override
+    public void abort() throws SQLException {
+
+        if (txContext == null || !isTransactionRunning()) {
+            return;
+        }
+
+        try {
+            if (e != null) {
+                txContext.abort(e);
+                e = null;
+            } else {
+                txContext.abort();
+            }
+        } catch (TransactionFailureException e) {
+            this.e = null;
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TRANSACTION_FAILED)
+                    .setMessage(e.getMessage()).setRootCause(e).build()
+                    .buildException();
+        }
+    }
+
+    @Override
+    public void checkpoint(boolean hasUncommittedData) throws SQLException {
+        if (hasUncommittedData) {
+            try {
+                if (txContext == null) {
+                    tx = txServiceClient.checkpoint(tx);
+                } else {
+                    assert (txContext != null);
+                    txContext.checkpoint();
+                    tx = txContext.getCurrentTransaction();
+                }
+            } catch (TransactionFailureException e) {
+                throw new SQLException(e);
+            }
+        }
+
+        // Since we're querying our own table while mutating it, we must exclude
+        // see our current mutations, otherwise we can get erroneous results
+        // (for DELETE)
+        // or get into an infinite loop (for UPSERT SELECT).
+        if (txContext == null) {
+            tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+        } else {
+            assert (txContext != null);
+            txContext.getCurrentTransaction().setVisibility(
+                    VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+        }
+    }
+
+    @Override
+    public void commitDDLFence(PTable dataTable, Logger logger)
+            throws SQLException {
+        byte[] key = dataTable.getName().getBytes();
+
+        try {
+            FenceWait fenceWait = VisibilityFence.prepareWait(key,
+                    txServiceClient);
+            fenceWait.await(10000, TimeUnit.MILLISECONDS);
+
+            if (logger.isInfoEnabled()) {
+                logger.info("Added write fence at ~"
+                        + getCurrentTransaction().getReadPointer());
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e)
+                    .build().buildException();
+        } catch (TimeoutException | TransactionFailureException e) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE)
+                    .setSchemaName(dataTable.getSchemaName().getString())
+                    .setTableName(dataTable.getTableName().getString()).build()
+                    .buildException();
+        }
+    }
+
+    public void markDMLFence(PTable table) {
+        byte[] logicalKey = table.getName().getBytes();
+        TransactionAware logicalTxAware = VisibilityFence.create(logicalKey);
+
+        if (this.txContext == null) {
+            this.txAwares.add(logicalTxAware);
+        } else {
+            this.txContext.addTransactionAware(logicalTxAware);
+        }
+
+        byte[] physicalKey = table.getPhysicalName().getBytes();
+        if (Bytes.compareTo(physicalKey, logicalKey) != 0) {
+            TransactionAware physicalTxAware = VisibilityFence
+                    .create(physicalKey);
+            if (this.txContext == null) {
+                this.txAwares.add(physicalTxAware);
+            } else {
+                this.txContext.addTransactionAware(physicalTxAware);
+            }
+        }
+    }
+
+    @Override
+    public void join(PhoenixTransactionContext ctx) {
+        assert (ctx instanceof TephraTransactionContext);
+        TephraTransactionContext tephraContext = (TephraTransactionContext) ctx;
+
+        if (txContext != null) {
+            for (TransactionAware txAware : tephraContext.getAwares()) {
+                txContext.addTransactionAware(txAware);
+            }
+        } else {
+            txAwares.addAll(tephraContext.getAwares());
+        }
+    }
+
+    private Transaction getCurrentTransaction() {
+        return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null;
+    }
+
+    @Override
+    public boolean isTransactionRunning() {
+        return getCurrentTransaction() != null;
+    }
+
+    @Override
+    public void reset() {
+        tx = null;
+        txAwares.clear();
+        this.e = null;
+    }
+
+    @Override
+    public long getTransactionId() {
+        Transaction tx = getCurrentTransaction();
+        return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getTransactionId(); // First write pointer - won't change with checkpointing
+    }
+
+    @Override
+    public long getReadPointer() {
+        Transaction tx = getCurrentTransaction();
+
+        if (tx == null) {
+            return (-1);
+        }
+
+        return tx.getReadPointer();
+    }
+
+    // For testing
+    @Override
+    public long getWritePointer() {
+        Transaction tx = getCurrentTransaction();
+        return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getWritePointer();
+    }
+
+    @Override
+    public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) {
+        VisibilityLevel tephraVisibilityLevel = null;
+
+        switch (visibilityLevel) {
+        case SNAPSHOT:
+            tephraVisibilityLevel = VisibilityLevel.SNAPSHOT;
+            break;
+        case SNAPSHOT_EXCLUDE_CURRENT:
+            tephraVisibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
+            break;
+        case SNAPSHOT_ALL:
+            System.out.println("OHAD Move to SNAPSHOT_ALL ");
+            System.out.flush();
+            tephraVisibilityLevel = VisibilityLevel.SNAPSHOT_ALL;
+            break;
+        default:
+            assert (false);
+        }
+
+        Transaction tx = getCurrentTransaction();
+        assert(tx != null);
+        tx.setVisibility(tephraVisibilityLevel);
+    }
+
+    @Override
+    public PhoenixVisibilityLevel getVisibilityLevel() {
+        VisibilityLevel visibilityLevel = null;
+
+        Transaction tx = getCurrentTransaction();
+        assert(tx != null);
+        visibilityLevel = tx.getVisibilityLevel();
+
+        PhoenixVisibilityLevel phoenixVisibilityLevel;
+        switch (visibilityLevel) {
+        case SNAPSHOT:
+            phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT;
+            break;
+        case SNAPSHOT_EXCLUDE_CURRENT:
+            phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
+            break;
+        case SNAPSHOT_ALL:
+            phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_ALL;
+        default:
+            phoenixVisibilityLevel = null;
+        }
+
+        return phoenixVisibilityLevel;
+    }
+
+    @Override
+    public byte[] encodeTransaction() throws SQLException {
+        Transaction tx = getCurrentTransaction();
+        assert (tx != null);
+
+        try {
+            return CODEC.encode(tx);
+        } catch (IOException e) {
+            throw new SQLException(e);
+        }
+    }
+
+    @Override
+    public long getMaxTransactionsPerSecond() {
+        return TxConstants.MAX_TX_PER_MS;
+    }
+
+    @Override
+    public boolean isPreExistingVersion(long version) {
+        return TxUtils.isPreExistingVersion(version);
+    }
+
+    @Override
+    public BaseRegionObserver getCoProcessor() {
+        return new TransactionProcessor();
+    }
+
+    @Override
+    public byte[] getFamilyDeleteMarker() {
+        return TxConstants.FAMILY_DELETE_QUALIFIER;
+    }
+
+    @Override
+    public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException {
+        config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+        config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
+        config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
+        config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort());
+        config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder);
+        config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, defaultTxnTimeoutSeconds);
+        config.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+        config.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L);
+    }
+
+    @Override
+    public void setupTxManager(Configuration config, String url) throws SQLException {
+
+        if (txService != null) {
+            return;
+        }
+
+        ConnectionInfo connInfo = ConnectionInfo.create(url);
+        zkClient = ZKClientServices.delegate(
+          ZKClients.reWatchOnExpire(
+            ZKClients.retryOnFailure(
+              ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
+                .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
+                        HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
+                .build(),
+              RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
+            )
+          )
+        );
+        zkClient.startAndWait();
+
+        DiscoveryService discovery = new ZKDiscoveryService(zkClient);
+        txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector());
+        txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager));
+        txService.startAndWait();
+    }
+
+    @Override
+    public void tearDownTxManager() {
+        try {
+            if (txService != null) txService.stopAndWait();
+        } finally {
+            try {
+                if (zkClient != null) zkClient.stopAndWait();
+            } finally {
+                txService = null;
+                zkClient = null;
+                txManager = null;
+            }
+        }
+    }
+
+    /**
+     * TephraTransactionContext specific functions
+     */
+
+    Transaction getTransaction() {
+        return this.getCurrentTransaction();
+    }
+
+    TransactionContext getContext() {
+        return this.txContext;
+    }
+
+    List<TransactionAware> getAwares() {
+        return txAwares;
+    }
+
+    void addTransactionAware(TransactionAware txAware) {
+        if (this.txContext != null) {
+            txContext.addTransactionAware(txAware);
+        } else if (this.tx != null) {
+            txAwares.add(txAware);
+            assert (tx != null);
+            txAware.startTx(tx);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
new file mode 100644
index 0000000..cf48521
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.TransactionAwareHTable;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
+public class TephraTransactionTable implements PhoenixTransactionalTable {
+
+    private TransactionAwareHTable transactionAwareHTable;
+
+    private TephraTransactionContext tephraTransactionContext;
+
+    public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) {
+        this(ctx, hTable, null);
+    }
+
+    public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable, PTable pTable) {
+
+        assert(ctx instanceof TephraTransactionContext);
+
+        tephraTransactionContext = (TephraTransactionContext) ctx;
+
+        transactionAwareHTable = new TransactionAwareHTable(hTable, (pTable != null && pTable.isImmutableRows()) ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
+
+        tephraTransactionContext.addTransactionAware(transactionAwareHTable);
+
+        if (pTable != null && pTable.getType() != PTableType.INDEX) {
+            tephraTransactionContext.markDMLFence(pTable);
+        }
+    }
+
+    @Override
+    public Result get(Get get) throws IOException {
+        return transactionAwareHTable.get(get);
+    }
+
+    @Override
+    public void put(Put put) throws IOException {
+        transactionAwareHTable.put(put);
+    }
+
+    @Override
+    public void delete(Delete delete) throws IOException {
+        transactionAwareHTable.delete(delete);
+    }
+
+    @Override
+    public ResultScanner getScanner(Scan scan) throws IOException {
+        return transactionAwareHTable.getScanner(scan);
+    }
+
+    @Override
+    public byte[] getTableName() {
+        return transactionAwareHTable.getTableName();
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+        return transactionAwareHTable.getConfiguration();
+    }
+
+    @Override
+    public HTableDescriptor getTableDescriptor() throws IOException {
+        return transactionAwareHTable.getTableDescriptor();
+    }
+
+    @Override
+    public boolean exists(Get get) throws IOException {
+        return transactionAwareHTable.exists(get);
+    }
+
+    @Override
+    public Result[] get(List<Get> gets) throws IOException {
+        return transactionAwareHTable.get(gets);
+    }
+
+    @Override
+    public ResultScanner getScanner(byte[] family) throws IOException {
+        return transactionAwareHTable.getScanner(family);
+    }
+
+    @Override
+    public ResultScanner getScanner(byte[] family, byte[] qualifier)
+            throws IOException {
+        return transactionAwareHTable.getScanner(family, qualifier);
+    }
+
+    @Override
+    public void put(List<Put> puts) throws IOException {
+        transactionAwareHTable.put(puts);
+    }
+
+    @Override
+    public void delete(List<Delete> deletes) throws IOException {
+        transactionAwareHTable.delete(deletes);
+    }
+
+    @Override
+    public void setAutoFlush(boolean autoFlush) {
+        transactionAwareHTable.setAutoFlush(autoFlush);
+    }
+
+    @Override
+    public boolean isAutoFlush() {
+        return transactionAwareHTable.isAutoFlush();
+    }
+
+    @Override
+    public long getWriteBufferSize() {
+        return transactionAwareHTable.getWriteBufferSize();
+    }
+
+    @Override
+    public void setWriteBufferSize(long writeBufferSize) throws IOException {
+        transactionAwareHTable.setWriteBufferSize(writeBufferSize);
+    }
+
+    @Override
+    public void flushCommits() throws IOException {
+        transactionAwareHTable.flushCommits();
+    }
+
+    @Override
+    public void close() throws IOException {
+        transactionAwareHTable.close();
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family,
+            byte[] qualifier, long amount, boolean writeToWAL)
+            throws IOException {
+        return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
+    }
+
+    @Override
+    public Boolean[] exists(List<Get> gets) throws IOException {
+        return transactionAwareHTable.exists(gets);
+    }
+
+    @Override
+    public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+        transactionAwareHTable.setAutoFlush(autoFlush, clearBufferOnFail);
+    }
+
+    @Override
+    public void setAutoFlushTo(boolean autoFlush) {
+        transactionAwareHTable.setAutoFlush(autoFlush);
+    }
+
+    @Override
+    public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
+        return transactionAwareHTable.getRowOrBefore(row, family);
+    }
+
+    @Override
+    public TableName getName() {
+        return transactionAwareHTable.getName();
+    }
+
+    @Override
+    public boolean[] existsAll(List<Get> gets) throws IOException {
+        return transactionAwareHTable.existsAll(gets);
+    }
+
+    @Override
+    public void batch(List<? extends Row> actions, Object[] results)
+            throws IOException, InterruptedException {
+        transactionAwareHTable.batch(actions, results);
+    }
+
+    @Override
+    public Object[] batch(List<? extends Row> actions) throws IOException,
+            InterruptedException {
+        return transactionAwareHTable.batch(actions);
+    }
+
+    @Override
+    public <R> void batchCallback(List<? extends Row> actions,
+            Object[] results, Callback<R> callback) throws IOException,
+            InterruptedException {
+        transactionAwareHTable.batchCallback(actions, results, callback);
+    }
+
+    @Override
+    public <R> Object[] batchCallback(List<? extends Row> actions,
+            Callback<R> callback) throws IOException, InterruptedException {
+        return transactionAwareHTable.batchCallback(actions, callback);
+    }
+
+    @Override
+    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+            byte[] value, Put put) throws IOException {
+        return transactionAwareHTable.checkAndPut(row, family, qualifier, value, put);
+    }
+
+    @Override
+    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+            CompareOp compareOp, byte[] value, Put put) throws IOException {
+        return transactionAwareHTable.checkAndPut(row, family, qualifier, compareOp, value, put);
+    }
+
+    @Override
+    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+            byte[] value, Delete delete) throws IOException {
+        return transactionAwareHTable.checkAndDelete(row, family, qualifier, value, delete);
+    }
+
+    @Override
+    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+            CompareOp compareOp, byte[] value, Delete delete)
+            throws IOException {
+        return transactionAwareHTable.checkAndDelete(row, family, qualifier, compareOp, value, delete);
+    }
+
+    @Override
+    public void mutateRow(RowMutations rm) throws IOException {
+        transactionAwareHTable.mutateRow(rm);
+    }
+
+    @Override
+    public Result append(Append append) throws IOException {
+        return transactionAwareHTable.append(append);
+    }
+
+    @Override
+    public Result increment(Increment increment) throws IOException {
+        return transactionAwareHTable.increment(increment);
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family,
+            byte[] qualifier, long amount) throws IOException {
+        return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount);
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family,
+            byte[] qualifier, long amount, Durability durability)
+            throws IOException {
+        return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, durability);
+    }
+
+    @Override
+    public CoprocessorRpcChannel coprocessorService(byte[] row) {
+        return transactionAwareHTable.coprocessorService(row);
+    }
+
+    @Override
+    public <T extends Service, R> Map<byte[], R> coprocessorService(
+            Class<T> service, byte[] startKey, byte[] endKey,
+            Call<T, R> callable) throws ServiceException, Throwable {
+        return transactionAwareHTable.coprocessorService(service, startKey, endKey, callable);
+    }
+
+    @Override
+    public <T extends Service, R> void coprocessorService(Class<T> service,
+            byte[] startKey, byte[] endKey, Call<T, R> callable,
+            Callback<R> callback) throws ServiceException, Throwable {
+        transactionAwareHTable.coprocessorService(service, startKey, endKey, callable, callback);
+    }
+
+    @Override
+    public <R extends Message> Map<byte[], R> batchCoprocessorService(
+            MethodDescriptor methodDescriptor, Message request,
+            byte[] startKey, byte[] endKey, R responsePrototype)
+            throws ServiceException, Throwable {
+        return transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype);
+    }
+
+    @Override
+    public <R extends Message> void batchCoprocessorService(
+            MethodDescriptor methodDescriptor, Message request,
+            byte[] startKey, byte[] endKey, R responsePrototype,
+            Callback<R> callback) throws ServiceException, Throwable {
+        transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback);
+    }
+
+    @Override
+    public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
+            CompareOp compareOp, byte[] value, RowMutations mutation)
+            throws IOException {
+        return transactionAwareHTable.checkAndMutate(row, family, qualifier, compareOp, value, mutation);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
new file mode 100644
index 0000000..8b3fc1d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.transaction;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+
+public class TransactionFactory {
+
+    static private TransactionFactory transactionFactory = null;
+
+    private TransactionProcessor tp = TransactionProcessor.Tephra;
+
+    enum TransactionProcessor {
+        Tephra,
+        Omid
+    }
+
+    private TransactionFactory(TransactionProcessor tp) {
+        this.tp = tp;
+    }
+
+    static public void createTransactionFactory(TransactionProcessor tp) {
+        if (transactionFactory == null) {
+            transactionFactory = new TransactionFactory(tp);
+        }
+    }
+
+    static public TransactionFactory getTransactionFactory() {
+        if (transactionFactory == null) {
+            createTransactionFactory(TransactionProcessor.Tephra);
+        }
+
+        return transactionFactory;
+    }
+
+    public PhoenixTransactionContext getTransactionContext()  {
+
+        PhoenixTransactionContext ctx = null;
+
+        switch(tp) {
+        case Tephra:
+            ctx = new TephraTransactionContext();
+            break;
+        case Omid:
+            ctx = new OmidTransactionContext();
+            break;
+        default:
+            ctx = null;
+        }
+        
+        return ctx;
+    }
+
+    public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException {
+
+        PhoenixTransactionContext ctx = null;
+
+        switch(tp) {
+        case Tephra:
+            ctx = new TephraTransactionContext(txnBytes);
+            break;
+        case Omid:
+//            ctx = new OmidTransactionContext(txnBytes);
+            break;
+        default:
+            ctx = null;
+        }
+        
+        return ctx;
+    }
+    
+    public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) {
+
+        PhoenixTransactionContext ctx = null;
+
+        switch(tp) {
+        case Tephra:
+            ctx = new TephraTransactionContext(connection);
+            break;
+        case Omid:
+//            ctx = new OmidTransactionContext(connection);
+            break;
+        default:
+            ctx = null;
+        }
+        
+        return ctx;
+    }
+
+    public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) {
+
+        PhoenixTransactionContext ctx = null;
+
+        switch(tp) {
+        case Tephra:
+            ctx = new TephraTransactionContext(contex, connection, subTask);
+            break;
+        case Omid:
+//            ctx = new OmidTransactionContext(contex, connection, subTask);
+            break;
+        default:
+            ctx = null;
+        }
+        
+        return ctx;
+    }
+
+    public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) {
+
+        PhoenixTransactionalTable table = null;
+
+        switch(tp) {
+        case Tephra:
+            table = new TephraTransactionTable(ctx, htable);
+            break;
+        case Omid:
+//            table = new OmidTransactionContext(contex, connection, subTask);
+            break;
+        default:
+            table = null;
+        }
+        
+        return table;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 2f65647..b81b904 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -111,7 +111,7 @@ import org.apache.phoenix.schema.types.PDecimal;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
-import org.apache.tephra.TxConstants;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
 
 import com.google.common.collect.Lists;
 import com.google.protobuf.ServiceException;
@@ -274,7 +274,7 @@ public class IndexUtil {
                     regionEndkey = tableRegionLocation.getRegionInfo().getEndKey();
                 }
                 Delete delete = maintainer.buildDeleteMutation(kvBuilder, null, ptr, Collections.<KeyValue>emptyList(), ts, regionStartKey, regionEndkey);
-                delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, dataMutation.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY));
+                delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, dataMutation.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY));
                 indexMutations.add(delete);
             }
             return indexMutations;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 0e3a9e9..30361c8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -86,7 +86,7 @@ import org.apache.phoenix.schema.RowKeyValueAccessor;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.types.PDataType;
-import org.apache.tephra.util.TxUtils;
+import org.apache.phoenix.transaction.TransactionFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
@@ -1462,7 +1462,7 @@ public class PhoenixRuntime {
      * @return wall clock time in milliseconds (i.e. Epoch time) of a given Cell time stamp.
      */
     public static long getWallClockTimeFromCellTimeStamp(long tsOfCell) {
-        return TxUtils.isPreExistingVersion(tsOfCell) ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell);
+        return TransactionFactory.getTransactionFactory().getTransactionContext().isPreExistingVersion(tsOfCell) ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell);
     }
 
     public static long getCurrentScn(ReadOnlyProps props) {


Mime
View raw message