phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [2/2] phoenix git commit: PHOENIX-4762 Performance regression with transactional immutable indexes
Date Fri, 01 Jun 2018 17:18:19 GMT
PHOENIX-4762 Performance regression with transactional immutable indexes


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f3119320
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f3119320
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f3119320

Branch: refs/heads/4.x-HBase-1.3
Commit: f3119320b6d263252a790c6a5cd96dce5a714ed6
Parents: 24af104
Author: James Taylor <jtaylor@salesforce.com>
Authored: Fri Jun 1 09:03:21 2018 -0700
Committer: James Taylor <jtaylor@salesforce.com>
Committed: Fri Jun 1 09:03:21 2018 -0700

----------------------------------------------------------------------
 .../apache/phoenix/execute/MutationState.java   | 701 ++++++++++---------
 .../PhoenixTxIndexMutationGenerator.java        |   2 +-
 .../transaction/OmidTransactionContext.java     |   6 +
 .../transaction/PhoenixTransactionContext.java  |   6 +
 .../transaction/TephraTransactionContext.java   |  16 +
 .../java/org/apache/phoenix/util/IndexUtil.java |  28 -
 6 files changed, 385 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/f3119320/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 52e490e..2e795b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -38,10 +38,9 @@ import javax.annotation.Nonnull;
 import javax.annotation.concurrent.Immutable;
 
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -83,7 +82,6 @@ import org.apache.phoenix.schema.PMetaData;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableRef;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
@@ -98,6 +96,7 @@ import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SQLCloseable;
@@ -116,9 +115,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 /**
- * 
  * Tracks the uncommitted state
- *
  */
 public class MutationState implements SQLCloseable {
     private static final Logger logger = LoggerFactory.getLogger(MutationState.class);
@@ -150,30 +147,34 @@ public class MutationState implements SQLCloseable {
         this(maxSize, maxSizeBytes, connection, false, null);
     }
 
-    public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, PhoenixTransactionContext txContext) {
+    public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection,
+            PhoenixTransactionContext txContext) {
         this(maxSize, maxSizeBytes, connection, false, txContext);
     }
 
     public MutationState(MutationState mutationState) {
-        this(mutationState.maxSize,  mutationState.maxSizeBytes, mutationState.connection, true, mutationState.getPhoenixTransactionContext());
+        this(mutationState.maxSize, mutationState.maxSizeBytes, mutationState.connection, true, mutationState
+                .getPhoenixTransactionContext());
     }
 
     public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, long sizeOffset) {
         this(maxSize, maxSizeBytes, connection, false, null, sizeOffset);
     }
 
-    private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext) {
+    private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask,
+            PhoenixTransactionContext txContext) {
         this(maxSize, maxSizeBytes, connection, subTask, txContext, 0);
     }
 
-    private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext, long sizeOffset) {
-        this(maxSize, maxSizeBytes, connection, Maps.<TableRef, MultiRowMutationState>newHashMapWithExpectedSize(5), subTask, txContext);
+    private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask,
+            PhoenixTransactionContext txContext, long sizeOffset) {
+        this(maxSize, maxSizeBytes, connection, Maps.<TableRef, MultiRowMutationState> newHashMapWithExpectedSize(5),
+                subTask, txContext);
         this.sizeOffset = sizeOffset;
     }
 
     MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection,
-            Map<TableRef, MultiRowMutationState> mutations,
-            boolean subTask, PhoenixTransactionContext txContext) {
+            Map<TableRef, MultiRowMutationState> mutations, boolean subTask, PhoenixTransactionContext txContext) {
         this.maxSize = maxSize;
         this.maxSizeBytes = maxSizeBytes;
         this.connection = connection;
@@ -193,7 +194,8 @@ public class MutationState implements SQLCloseable {
         }
     }
 
-    public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection)  throws SQLException {
+    public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, long maxSize,
+            long maxSizeBytes, PhoenixConnection connection) throws SQLException {
         this(maxSize, maxSizeBytes, connection, false, null, sizeOffset);
         if (!mutations.isEmpty()) {
             this.mutations.put(table, mutations);
@@ -202,7 +204,7 @@ public class MutationState implements SQLCloseable {
         this.estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(this.mutations);
         throwIfTooBig();
     }
-    
+
     public long getEstimatedSize() {
         return estimatedSize;
     }
@@ -220,13 +222,12 @@ public class MutationState implements SQLCloseable {
     }
 
     /**
-     * Commit a write fence when creating an index so that we can detect
-     * when a data table transaction is started before the create index
-     * but completes after it. In this case, we need to rerun the data
-     * table transaction after the index creation so that the index rows
-     * are generated. See TEPHRA-157
-     * for more information.
-     * @param dataTable the data table upon which an index is being added
+     * Commit a write fence when creating an index so that we can detect when a data table transaction is started before
+     * the create index but completes after it. In this case, we need to rerun the data table transaction after the
+     * index creation so that the index rows are generated. See TEPHRA-157 for more information.
+     * 
+     * @param dataTable
+     *            the data table upon which an index is being added
      * @throws SQLException
      */
     public void commitDDLFence(PTable dataTable) throws SQLException {
@@ -242,19 +243,17 @@ public class MutationState implements SQLCloseable {
             }
         }
     }
-    
+
     public boolean checkpointIfNeccessary(MutationPlan plan) throws SQLException {
-        if (! phoenixTransactionContext.isTransactionRunning()  || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) {
-            return false;
-        }
+        if (!phoenixTransactionContext.isTransactionRunning() || plan.getTargetRef() == null
+                || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) { return false; }
         Set<TableRef> sources = plan.getSourceRefs();
-        if (sources.isEmpty()) {
-            return false;
-        }
+        if (sources.isEmpty()) { return false; }
         // For a DELETE statement, we're always querying the table being deleted from. This isn't
         // a problem, but it potentially could be if there are other references to the same table
         // nested in the DELETE statement (as a sub query or join, for example).
-        TableRef ignoreForExcludeCurrent = plan.getOperation() == Operation.DELETE && sources.size() == 1 ? plan.getTargetRef() : null;
+        TableRef ignoreForExcludeCurrent = plan.getOperation() == Operation.DELETE && sources.size() == 1 ? plan
+                .getTargetRef() : null;
         boolean excludeCurrent = false;
         String targetPhysicalName = plan.getTargetRef().getTable().getPhysicalName().getString();
         for (TableRef source : sources) {
@@ -279,7 +278,8 @@ public class MutationState implements SQLCloseable {
                 // external transaction context, it's possible that data was already written at the
                 // current transaction timestamp, so we always checkpoint in that case is we're
                 // reading and writing to the same table.
-                if (source.getTable().isTransactional() && (isExternalTxContext || uncommittedPhysicalNames.contains(sourcePhysicalName))) {
+                if (source.getTable().isTransactional()
+                        && (isExternalTxContext || uncommittedPhysicalNames.contains(sourcePhysicalName))) {
                     hasUncommittedData = true;
                     break;
                 }
@@ -331,30 +331,20 @@ public class MutationState implements SQLCloseable {
     }
 
     public boolean startTransaction(Provider provider) throws SQLException {
-        if (provider == null) {
-            return false;
-        }
-        if (!connection.getQueryServices().getProps().getBoolean(
-                QueryServices.TRANSACTIONS_ENABLED,
-                QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
-            throw new SQLExceptionInfo.Builder(
-                    SQLExceptionCode.CANNOT_START_TXN_IF_TXN_DISABLED)
-                    .build().buildException();
-        }
-        if (connection.getSCN() != null) {
-            throw new SQLExceptionInfo.Builder(
-                    SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET)
-                    .build().buildException();
-        }
+        if (provider == null) { return false; }
+        if (!connection.getQueryServices().getProps()
+                .getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) { throw new SQLExceptionInfo.Builder(
+                SQLExceptionCode.CANNOT_START_TXN_IF_TXN_DISABLED).build().buildException(); }
+        if (connection.getSCN() != null) { throw new SQLExceptionInfo.Builder(
+                SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET).build().buildException(); }
 
         if (phoenixTransactionContext == PhoenixTransactionContext.NULL_CONTEXT) {
             phoenixTransactionContext = provider.getTransactionProvider().getTransactionContext(connection);
         } else {
-            if (provider != phoenixTransactionContext.getProvider()) {
-                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_MIX_TXN_PROVIDERS)
-                        .setMessage(phoenixTransactionContext.getProvider().name() + " and " + provider.name())
-                        .build().buildException();
-            }
+            if (provider != phoenixTransactionContext.getProvider()) { throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.CANNOT_MIX_TXN_PROVIDERS)
+                    .setMessage(phoenixTransactionContext.getProvider().name() + " and " + provider.name()).build()
+                    .buildException(); }
         }
         if (!isTransactionStarted()) {
             // Clear any transactional state in case transaction was ended outside
@@ -371,28 +361,28 @@ public class MutationState implements SQLCloseable {
     }
 
     public static MutationState emptyMutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) {
-        MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, MultiRowMutationState>emptyMap(), false, null);
+        MutationState state = new MutationState(maxSize, maxSizeBytes, connection,
+                Collections.<TableRef, MultiRowMutationState> emptyMap(), false, null);
         state.sizeOffset = 0;
         return state;
     }
-    
+
     private void throwIfTooBig() throws SQLException {
         if (numRows > maxSize) {
             resetState();
-            throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED).build()
-                    .buildException();
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED).build().buildException();
         }
         if (estimatedSize > maxSizeBytes) {
             resetState();
-            throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED)
-                    .build().buildException();
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED).build()
+                    .buildException();
         }
     }
-    
+
     public long getUpdateCount() {
         return sizeOffset + numRows;
     }
-    
+
     private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows,
             Map<TableRef, MultiRowMutationState> dstMutations) {
         PTable table = tableRef.getTable();
@@ -401,21 +391,22 @@ public class MutationState implements SQLCloseable {
         MultiRowMutationState existingRows = dstMutations.put(tableRef, srcRows);
         if (existingRows != null) { // Rows for that table already exist
             // Loop through new rows and replace existing with new
-            for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : srcRows.entrySet()) {
+            for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : srcRows.entrySet()) {
                 // Replace existing row with new row
                 RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue());
                 if (existingRowMutationState != null) {
-                    Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues();
+                    Map<PColumn, byte[]> existingValues = existingRowMutationState.getColumnValues();
                     if (existingValues != PRow.DELETE_MARKER) {
-                        Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues();
-                        // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. 
+                        Map<PColumn, byte[]> newRow = rowEntry.getValue().getColumnValues();
+                        // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with
+                        // existing row.
                         if (newRow != PRow.DELETE_MARKER) {
                             // decrement estimated size by the size of the old row
-                            estimatedSize-=existingRowMutationState.calculateEstimatedSize();
+                            estimatedSize -= existingRowMutationState.calculateEstimatedSize();
                             // Merge existing column values with new column values
                             existingRowMutationState.join(rowEntry.getValue());
                             // increment estimated size by the size of the new row
-                            estimatedSize+=existingRowMutationState.calculateEstimatedSize();
+                            estimatedSize += existingRowMutationState.calculateEstimatedSize();
                             // Now that the existing row has been merged with the new row, replace it back
                             // again (since it was merged with the new one above).
                             existingRows.put(rowEntry.getKey(), existingRowMutationState);
@@ -440,12 +431,12 @@ public class MutationState implements SQLCloseable {
                 numRows += srcRows.size();
                 // if we added all the rows from newMutationState we can just increment the
                 // estimatedSize by newMutationState.estimatedSize
-                estimatedSize +=  srcRows.estimatedSize;
+                estimatedSize += srcRows.estimatedSize;
             }
         }
     }
-    
-    private void joinMutationState(Map<TableRef, MultiRowMutationState> srcMutations, 
+
+    private void joinMutationState(Map<TableRef, MultiRowMutationState> srcMutations,
             Map<TableRef, MultiRowMutationState> dstMutations) {
         // Merge newMutation with this one, keeping state from newMutation for any overlaps
         for (Map.Entry<TableRef, MultiRowMutationState> entry : srcMutations.entrySet()) {
@@ -455,11 +446,13 @@ public class MutationState implements SQLCloseable {
             joinMutationState(tableRef, srcRows, dstMutations);
         }
     }
+
     /**
      * Combine a newer mutation with this one, where in the event of overlaps, the newer one will take precedence.
      * Combine any metrics collected for the newer mutation.
      * 
-     * @param newMutationState the newer mutation state
+     * @param newMutationState
+     *            the newer mutation state
      */
     public void join(MutationState newMutationState) throws SQLException {
         if (this == newMutationState) { // Doesn't make sense
@@ -485,17 +478,16 @@ public class MutationState implements SQLCloseable {
         throwIfTooBig();
     }
 
-
     private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable table) {
         RowKeySchema schema = table.getRowKeySchema();
         int rowTimestampColPos = table.getRowTimestampColPos();
-        Field rowTimestampField = schema.getField(rowTimestampColPos); 
+        Field rowTimestampField = schema.getField(rowTimestampColPos);
         byte[] rowTimestampBytes = PLong.INSTANCE.toBytes(rowTimestamp, rowTimestampField.getSortOrder());
         int oldOffset = ptr.getOffset();
         int oldLength = ptr.getLength();
         // Move the pointer to the start byte of the row timestamp pk
         schema.position(ptr, 0, rowTimestampColPos);
-        byte[] b  = ptr.get();
+        byte[] b = ptr.get();
         int newOffset = ptr.getOffset();
         int length = ptr.getLength();
         for (int i = newOffset; i < newOffset + length; i++) {
@@ -506,24 +498,29 @@ public class MutationState implements SQLCloseable {
         ptr.set(ptr.get(), oldOffset, oldLength);
         return ptr;
     }
-    
-    private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final MultiRowMutationState values,
-            final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) {
+
+    private static List<PTable> getClientMaintainedIndexes(PTable table) {
+        Iterator<PTable> indexIterator = // Only maintain tables with immutable rows through this client-side mechanism
+        (table.isImmutableRows() || table.isTransactional()) ? IndexMaintainer.maintainedGlobalIndexes(table
+                .getIndexes().iterator()) : Collections.<PTable> emptyIterator();
+        return Lists.newArrayList(indexIterator);
+    }
+
+    private Iterator<Pair<PName, List<Mutation>>> addRowMutations(final TableRef tableRef,
+            final MultiRowMutationState values, final long mutationTimestamp, final long serverTimestamp,
+            boolean includeAllIndexes, final boolean sendAll) {
         final PTable table = tableRef.getTable();
-        final Iterator<PTable> indexIterator = // Only maintain tables with immutable rows through this client-side mechanism
-                includeAllIndexes ?
-                         IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()) :
-                             (table.isImmutableRows() || table.isTransactional()) ?
-                                IndexMaintainer.maintainedGlobalIndexes(table.getIndexes().iterator()) :
-                                    Collections.<PTable>emptyIterator();
-        final List<PTable> indexList = Lists.newArrayList(indexIterator);
+        final List<PTable> indexList = includeAllIndexes ? Lists.newArrayList(IndexMaintainer.maintainedIndexes(table
+                .getIndexes().iterator())) : getClientMaintainedIndexes(table);
         final Iterator<PTable> indexes = indexList.iterator();
         final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size());
-        final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
-        generateMutations(tableRef, mutationTimestamp, serverTimestamp, values, mutationList, mutationsPertainingToIndex);
-        return new Iterator<Pair<PName,List<Mutation>>>() {
+        final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists
+                .<Mutation> newArrayListWithExpectedSize(values.size()) : null;
+        generateMutations(tableRef, mutationTimestamp, serverTimestamp, values, mutationList,
+                mutationsPertainingToIndex);
+        return new Iterator<Pair<PName, List<Mutation>>>() {
             boolean isFirst = true;
-            Map<byte[],List<Mutation>> indexMutationsMap = null;
+            Map<byte[], List<Mutation>> indexMutationsMap = null;
 
             @Override
             public boolean hasNext() {
@@ -534,19 +531,22 @@ public class MutationState implements SQLCloseable {
             public Pair<PName, List<Mutation>> next() {
                 if (isFirst) {
                     isFirst = false;
-                    return new Pair<PName,List<Mutation>>(table.getPhysicalName(), mutationList);
+                    return new Pair<PName, List<Mutation>>(table.getPhysicalName(), mutationList);
                 }
 
                 PTable index = indexes.next();
-                
+
                 List<Mutation> indexMutations = null;
                 try {
                     if (!mutationsPertainingToIndex.isEmpty()) {
                         if (table.isTransactional()) {
                             if (indexMutationsMap == null) {
-                                PhoenixTxIndexMutationGenerator generator = newTxIndexMutationGenerator(table, indexList, mutationsPertainingToIndex.get(0).getAttributesMap());
-                                try (HTableInterface htable = connection.getQueryServices().getTable(table.getPhysicalName().getBytes())) {
-                                    Collection<Pair<Mutation, byte[]>> allMutations = generator.getIndexUpdates(htable, mutationsPertainingToIndex.iterator());
+                                PhoenixTxIndexMutationGenerator generator = newTxIndexMutationGenerator(table,
+                                        indexList, mutationsPertainingToIndex.get(0).getAttributesMap());
+                                try (HTableInterface htable = connection.getQueryServices().getTable(
+                                        table.getPhysicalName().getBytes())) {
+                                    Collection<Pair<Mutation, byte[]>> allMutations = generator.getIndexUpdates(htable,
+                                            mutationsPertainingToIndex.iterator());
                                     indexMutationsMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
                                     for (Pair<Mutation, byte[]> mutation : allMutations) {
                                         List<Mutation> mutations = indexMutationsMap.get(mutation.getSecond());
@@ -559,20 +559,21 @@ public class MutationState implements SQLCloseable {
                                 }
                             }
                             indexMutations = indexMutationsMap.get(index.getPhysicalName().getBytes());
-                         } else {
-                            indexMutations =
-                                    IndexUtil.generateIndexData(table, index, values, mutationsPertainingToIndex,
-                                        connection.getKeyValueBuilder(), connection);
+                        } else {
+                            indexMutations = IndexUtil.generateIndexData(table, index, values,
+                                    mutationsPertainingToIndex, connection.getKeyValueBuilder(), connection);
                         }
                     }
 
-                    // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map
+                    // we may also have to include delete mutations for immutable tables if we are not processing all
+                    // the tables in the mutations map
                     if (!sendAll) {
                         TableRef key = new TableRef(index);
                         MultiRowMutationState multiRowMutationState = mutations.remove(key);
-                        if (multiRowMutationState!=null) {
+                        if (multiRowMutationState != null) {
                             final List<Mutation> deleteMutations = Lists.newArrayList();
-                            generateMutations(tableRef, mutationTimestamp, serverTimestamp, multiRowMutationState, deleteMutations, null);
+                            generateMutations(tableRef, mutationTimestamp, serverTimestamp, multiRowMutationState,
+                                    deleteMutations, null);
                             if (indexMutations == null) {
                                 indexMutations = deleteMutations;
                             } else {
@@ -583,18 +584,20 @@ public class MutationState implements SQLCloseable {
                 } catch (SQLException | IOException e) {
                     throw new IllegalDataException(e);
                 }
-                return new Pair<PName,List<Mutation>>(index.getPhysicalName(),indexMutations == null ? Collections.<Mutation>emptyList() : indexMutations);
+                return new Pair<PName, List<Mutation>>(index.getPhysicalName(),
+                        indexMutations == null ? Collections.<Mutation> emptyList() : indexMutations);
             }
 
             @Override
             public void remove() {
                 throw new UnsupportedOperationException();
             }
-            
+
         };
     }
 
-    private PhoenixTxIndexMutationGenerator newTxIndexMutationGenerator(PTable table, List<PTable> indexes, Map<String,byte[]> attributes) {
+    private PhoenixTxIndexMutationGenerator newTxIndexMutationGenerator(PTable table, List<PTable> indexes,
+            Map<String, byte[]> attributes) {
         final List<IndexMaintainer> indexMaintainers = Lists.newArrayListWithExpectedSize(indexes.size());
         for (PTable index : indexes) {
             IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
@@ -603,8 +606,7 @@ public class MutationState implements SQLCloseable {
         IndexMetaDataCache indexMetaDataCache = new IndexMetaDataCache() {
 
             @Override
-            public void close() throws IOException {
-            }
+            public void close() throws IOException {}
 
             @Override
             public List<IndexMaintainer> getIndexMaintainers() {
@@ -613,30 +615,30 @@ public class MutationState implements SQLCloseable {
 
             @Override
             public PhoenixTransactionContext getTransactionContext() {
-                return phoenixTransactionContext;
+                return phoenixTransactionContext.newTransactionContext(phoenixTransactionContext, true);
             }
 
             @Override
             public int getClientVersion() {
                 return MetaDataProtocol.PHOENIX_VERSION;
             }
-            
+
         };
         try {
             PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(indexMetaDataCache, attributes);
-            return new PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(), indexMetaData, table.getPhysicalName().getBytes());
+            return new PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(), indexMetaData,
+                    table.getPhysicalName().getBytes());
         } catch (IOException e) {
             throw new RuntimeException(e); // Impossible
         }
     }
-    
-    private void generateMutations(final TableRef tableRef, final long mutationTimestamp,
-            final long serverTimestamp, final MultiRowMutationState values,
-            final List<Mutation> mutationList, final List<Mutation> mutationsPertainingToIndex) {
+
+    private void generateMutations(final TableRef tableRef, final long mutationTimestamp, final long serverTimestamp,
+            final MultiRowMutationState values, final List<Mutation> mutationList,
+            final List<Mutation> mutationsPertainingToIndex) {
         final PTable table = tableRef.getTable();
         boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1;
-        Iterator<Map.Entry<ImmutableBytesPtr, RowMutationState>> iterator =
-                values.entrySet().iterator();
+        Iterator<Map.Entry<ImmutableBytesPtr, RowMutationState>> iterator = values.entrySet().iterator();
         long timestampToUse = mutationTimestamp;
         MultiRowMutationState modifiedValues = new MultiRowMutationState(16);
         while (iterator.hasNext()) {
@@ -650,11 +652,11 @@ public class MutationState implements SQLCloseable {
                 if (rowTsColInfo.useServerTimestamp()) {
                     // regenerate the key with this timestamp.
                     key = getNewRowKeyWithRowTimestamp(key, serverTimestamp, table);
-                	// since we are about to modify the byte[] stored in key (which changes its hashcode)
-                	// we need to remove the entry from the values map and add a new entry with the modified byte[]
-                	modifiedValues.put(key, state);
-                	iterator.remove();
-                	timestampToUse = serverTimestamp;
+                    // since we are about to modify the byte[] stored in key (which changes its hashcode)
+                    // we need to remove the entry from the values map and add a new entry with the modified byte[]
+                    modifiedValues.put(key, state);
+                    iterator.remove();
+                    timestampToUse = serverTimestamp;
                 } else {
                     if (rowTsColInfo.getTimestamp() != null) {
                         timestampToUse = rowTsColInfo.getTimestamp();
@@ -669,15 +671,14 @@ public class MutationState implements SQLCloseable {
                 // The DeleteCompiler already generates the deletes for indexes, so no need to do it again
                 rowMutationsPertainingToIndex = Collections.emptyList();
             } else {
-                for (Map.Entry<PColumn, byte[]> valueEntry : rowEntry.getValue().getColumnValues()
-                        .entrySet()) {
+                for (Map.Entry<PColumn, byte[]> valueEntry : rowEntry.getValue().getColumnValues().entrySet()) {
                     row.setValue(valueEntry.getKey(), valueEntry.getValue());
                 }
                 rowMutations = row.toRowMutations();
                 // Pass through ON DUPLICATE KEY info through mutations
                 // In the case of the same clause being used on many statements, this will be
                 // inefficient because we're transmitting the same information for each mutation.
-                // TODO: use our ServerCache 
+                // TODO: use our ServerCache
                 for (Mutation mutation : rowMutations) {
                     if (onDupKeyBytes != null) {
                         mutation.setAttribute(PhoenixIndexBuilder.ATOMIC_OP_ATTRIB, onDupKeyBytes);
@@ -686,61 +687,61 @@ public class MutationState implements SQLCloseable {
                 rowMutationsPertainingToIndex = rowMutations;
             }
             mutationList.addAll(rowMutations);
-            if (mutationsPertainingToIndex != null) mutationsPertainingToIndex
-                    .addAll(rowMutationsPertainingToIndex);
+            if (mutationsPertainingToIndex != null) mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex);
         }
         values.putAll(modifiedValues);
     }
-    
+
     /**
      * Get the unsorted list of HBase mutations for the tables with uncommitted data.
+     * 
      * @return list of HBase mutations for uncommitted data.
      */
-    public Iterator<Pair<byte[],List<Mutation>>> toMutations(Long timestamp) {
+    public Iterator<Pair<byte[], List<Mutation>>> toMutations(Long timestamp) {
         return toMutations(false, timestamp);
     }
-    
-    public Iterator<Pair<byte[],List<Mutation>>> toMutations() {
+
+    public Iterator<Pair<byte[], List<Mutation>>> toMutations() {
         return toMutations(false, null);
     }
-    
-    public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes) {
+
+    public Iterator<Pair<byte[], List<Mutation>>> toMutations(final boolean includeMutableIndexes) {
         return toMutations(includeMutableIndexes, null);
     }
-    
-    public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes, final Long tableTimestamp) {
+
+    public Iterator<Pair<byte[], List<Mutation>>> toMutations(final boolean includeMutableIndexes,
+            final Long tableTimestamp) {
         final Iterator<Map.Entry<TableRef, MultiRowMutationState>> iterator = this.mutations.entrySet().iterator();
-        if (!iterator.hasNext()) {
-            return Collections.emptyIterator();
-        }
+        if (!iterator.hasNext()) { return Collections.emptyIterator(); }
         Long scn = connection.getSCN();
         final long serverTimestamp = getTableTimestamp(tableTimestamp, scn);
         final long mutationTimestamp = getMutationTimestamp(scn);
-        return new Iterator<Pair<byte[],List<Mutation>>>() {
+        return new Iterator<Pair<byte[], List<Mutation>>>() {
             private Map.Entry<TableRef, MultiRowMutationState> current = iterator.next();
-            private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init();
-                    
-            private Iterator<Pair<byte[],List<Mutation>>> init() {
-                final Iterator<Pair<PName, List<Mutation>>> mutationIterator = addRowMutations(current.getKey(), current.getValue(), mutationTimestamp, serverTimestamp, includeMutableIndexes, true);
-                return new Iterator<Pair<byte[],List<Mutation>>>() {
+            private Iterator<Pair<byte[], List<Mutation>>> innerIterator = init();
+
+            private Iterator<Pair<byte[], List<Mutation>>> init() {
+                final Iterator<Pair<PName, List<Mutation>>> mutationIterator = addRowMutations(current.getKey(),
+                        current.getValue(), mutationTimestamp, serverTimestamp, includeMutableIndexes, true);
+                return new Iterator<Pair<byte[], List<Mutation>>>() {
                     @Override
                     public boolean hasNext() {
                         return mutationIterator.hasNext();
                     }
 
                     @Override
-                     public Pair<byte[], List<Mutation>> next() {
+                    public Pair<byte[], List<Mutation>> next() {
                         Pair<PName, List<Mutation>> pair = mutationIterator.next();
                         return new Pair<byte[], List<Mutation>>(pair.getFirst().getBytes(), pair.getSecond());
                     }
-                    
+
                     @Override
                     public void remove() {
                         mutationIterator.remove();
                     }
                 };
             }
-            
+
             @Override
             public boolean hasNext() {
                 return innerIterator.hasNext() || iterator.hasNext();
@@ -750,7 +751,7 @@ public class MutationState implements SQLCloseable {
             public Pair<byte[], List<Mutation>> next() {
                 if (!innerIterator.hasNext()) {
                     current = iterator.next();
-                    innerIterator=init();
+                    innerIterator = init();
                 }
                 return innerIterator.next();
             }
@@ -759,24 +760,26 @@ public class MutationState implements SQLCloseable {
             public void remove() {
                 throw new UnsupportedOperationException();
             }
-            
+
         };
     }
 
     public static long getTableTimestamp(final Long tableTimestamp, Long scn) {
-        return (tableTimestamp!=null && tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null ? HConstants.LATEST_TIMESTAMP : scn);
+        return (tableTimestamp != null && tableTimestamp != QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp
+                : (scn == null ? HConstants.LATEST_TIMESTAMP : scn);
     }
-    
+
     public static long getMutationTimestamp(final Long scn) {
         return scn == null ? HConstants.LATEST_TIMESTAMP : scn;
     }
 
     /**
-     * Validates that the meta data is valid against the server meta data if we haven't yet done so.
-     * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta data
-     * has changed.
+     * Validates that the meta data is valid against the server meta data if we haven't yet done so. Otherwise, for
+     * every UPSERT VALUES call, we'd need to hit the server to see if the meta data has changed.
+     * 
      * @return the server time to use for the upsert
-     * @throws SQLException if the table or any columns no longer exist
+     * @throws SQLException
+     *             if the table or any columns no longer exist
      */
     private long[] validateAll() throws SQLException {
         int i = 0;
@@ -787,19 +790,20 @@ public class MutationState implements SQLCloseable {
         }
         return timeStamps;
     }
-    
-    private long validateAndGetServerTimestamp(TableRef tableRef, MultiRowMutationState rowKeyToColumnMap) throws SQLException {
+
+    private long validateAndGetServerTimestamp(TableRef tableRef, MultiRowMutationState rowKeyToColumnMap)
+            throws SQLException {
         Long scn = connection.getSCN();
         MetaDataClient client = new MetaDataClient(connection);
         long serverTimeStamp = tableRef.getTimeStamp();
         // If we're auto committing, we've already validated the schema when we got the ColumnResolver,
         // so no need to do it again here.
         PTable table = tableRef.getTable();
-        MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString());
+        MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName()
+                .getString());
         PTable resolvedTable = result.getTable();
-        if (resolvedTable == null) {
-            throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString());
-        }
+        if (resolvedTable == null) { throw new TableNotFoundException(table.getSchemaName().getString(), table
+                .getTableName().getString()); }
         // Always update tableRef table as the one we've cached may be out of date since when we executed
         // the UPSERT VALUES call and updated in the cache before this.
         tableRef.setTable(resolvedTable);
@@ -807,39 +811,38 @@ public class MutationState implements SQLCloseable {
         for (PTable idxTtable : indexes) {
             // If index is still active, but has a non zero INDEX_DISABLE_TIMESTAMP value, then infer that
             // our failure mode is block writes on index failure.
-            if ((idxTtable.getIndexState() == PIndexState.ACTIVE || idxTtable.getIndexState() == PIndexState.PENDING_ACTIVE) && idxTtable.getIndexDisableTimestamp() > 0) {
-                throw new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE)
-                .setSchemaName(table.getSchemaName().getString())
-                .setTableName(table.getTableName().getString()).build().buildException();
-            }
-        } 
+            if ((idxTtable.getIndexState() == PIndexState.ACTIVE || idxTtable.getIndexState() == PIndexState.PENDING_ACTIVE)
+                    && idxTtable.getIndexDisableTimestamp() > 0) { throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE).setSchemaName(table.getSchemaName().getString())
+                    .setTableName(table.getTableName().getString()).build().buildException(); }
+        }
         long timestamp = result.getMutationTime();
         if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
             serverTimeStamp = timestamp;
             if (result.wasUpdated()) {
                 List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumns().size());
-                for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) {
+                for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) {
                     RowMutationState valueEntry = rowEntry.getValue();
                     if (valueEntry != null) {
                         Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
                         if (colValues != PRow.DELETE_MARKER) {
                             for (PColumn column : colValues.keySet()) {
-                                if (!column.isDynamic())
-                                    columns.add(column);
+                                if (!column.isDynamic()) columns.add(column);
                             }
                         }
                     }
                 }
                 for (PColumn column : columns) {
                     if (column != null) {
-                        resolvedTable.getColumnFamily(column.getFamilyName().getString()).getPColumnForColumnName(column.getName().getString());
+                        resolvedTable.getColumnFamily(column.getFamilyName().getString()).getPColumnForColumnName(
+                                column.getName().getString());
                     }
                 }
             }
         }
         return serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp;
     }
-    
+
     private static long calculateMutationSize(List<Mutation> mutations) {
         long byteSize = 0;
         if (GlobalClientMetrics.isMetricsEnabled()) {
@@ -850,24 +853,6 @@ public class MutationState implements SQLCloseable {
         GLOBAL_MUTATION_BYTES.update(byteSize);
         return byteSize;
     }
-    
-    private boolean hasKeyValueColumn(PTable table, PTable index) {
-        IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
-        return !maintainer.getAllColumns().isEmpty();
-    }
-    
-    private void divideImmutableIndexes(Iterator<PTable> enabledImmutableIndexes, PTable table, List<PTable> rowKeyIndexes, List<PTable> keyValueIndexes) {
-        while (enabledImmutableIndexes.hasNext()) {
-            PTable index = enabledImmutableIndexes.next();
-            if (index.getIndexType() != IndexType.LOCAL) {
-                if (hasKeyValueColumn(table, index)) {
-                    keyValueIndexes.add(index);
-                } else {
-                    rowKeyIndexes.add(index);
-                }
-            }
-        }
-    }
 
     public long getBatchSizeBytes() {
         return batchSizeBytes;
@@ -879,56 +864,52 @@ public class MutationState implements SQLCloseable {
 
     private class MetaDataAwareHTable extends DelegateHTable {
         private final TableRef tableRef;
-        
+
         private MetaDataAwareHTable(HTableInterface delegate, TableRef tableRef) {
             super(delegate);
             this.tableRef = tableRef;
         }
-        
+
         /**
-         * Called by Tephra when a transaction is aborted. We have this wrapper so that we get an
-         * opportunity to attach our index meta data to the mutations such that we can also undo
-         * the index mutations.
+         * Called by Tephra when a transaction is aborted. We have this wrapper so that we get an opportunity to attach
+         * our index meta data to the mutations such that we can also undo the index mutations.
          */
         @Override
         public void delete(List<Delete> deletes) throws IOException {
             ServerCache cache = null;
             try {
+                if (deletes.isEmpty()) { return; }
+                // Attach meta data for server maintained indexes
                 PTable table = tableRef.getTable();
-                List<PTable> indexes = table.getIndexes();
-                Iterator<PTable> enabledIndexes = IndexMaintainer.maintainedIndexes(indexes.iterator());
-                if (enabledIndexes.hasNext()) {
-                    List<PTable> keyValueIndexes = Collections.emptyList();
-                    ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
-                    boolean attachMetaData = table.getIndexMaintainers(indexMetaDataPtr, connection);
-                    if (table.isImmutableRows()) {
-                        List<PTable> rowKeyIndexes = Lists.newArrayListWithExpectedSize(indexes.size());
-                        keyValueIndexes = Lists.newArrayListWithExpectedSize(indexes.size());
-                        divideImmutableIndexes(enabledIndexes, table, rowKeyIndexes, keyValueIndexes);
-                        // Generate index deletes for immutable indexes that only reference row key
-                        // columns and submit directly here.
-                        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-                        for (PTable index : rowKeyIndexes) {
-                            List<Delete> indexDeletes = IndexUtil.generateDeleteIndexData(table, index, deletes, ptr, connection.getKeyValueBuilder(), connection);
-                            HTableInterface hindex = connection.getQueryServices().getTable(index.getPhysicalName().getBytes());
-                            hindex.delete(indexDeletes);
+                ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
+                if (table.getIndexMaintainers(indexMetaDataPtr, connection)) {
+                    cache = setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr);
+                }
+
+                // Send deletes for client maintained indexes
+                List<PTable> indexes = getClientMaintainedIndexes(table);
+                if (!indexes.isEmpty()) {
+                PhoenixTxIndexMutationGenerator generator = newTxIndexMutationGenerator(table, indexes, deletes.get(0)
+                            .getAttributesMap());
+                    Collection<Pair<Mutation, byte[]>> indexUpdates = generator.getIndexUpdates(delegate,
+                            deletes.iterator());
+                    for (PTable index : indexes) {
+                        byte[] physicalName = index.getPhysicalName().getBytes();
+                        try (HTableInterface hindex = connection.getQueryServices().getTable(physicalName)) {
+                            List<Mutation> indexDeletes = Lists.newArrayListWithExpectedSize(deletes.size());
+                            for (Pair<Mutation, byte[]> mutationPair : indexUpdates) {
+                                if (Bytes.equals(mutationPair.getSecond(), physicalName)) {
+                                    indexDeletes.add(mutationPair.getFirst());
+                                }
+                                hindex.batch(indexDeletes);
+                            }
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            throw new IOException(e);
                         }
                     }
-                    
-                    // If we have mutable indexes, local immutable indexes, or global immutable indexes
-                    // that reference key value columns, setup index meta data and attach here. In this
-                    // case updates to the indexes will be generated on the server side.
-                    // An alternative would be to let Tephra track the row keys for the immutable index
-                    // by adding it as a transaction participant (soon we can prevent any conflict
-                    // detection from occurring) with the downside being the additional memory required.
-                    if (!keyValueIndexes.isEmpty()) {
-                        attachMetaData = true;
-                        IndexMaintainer.serializeAdditional(table, indexMetaDataPtr, keyValueIndexes, connection);
-                    }
-                    if (attachMetaData) {
-                        cache = setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr);
-                    }
                 }
+
                 delegate.delete(deletes);
             } catch (SQLException e) {
                 throw new IOException(e);
@@ -939,13 +920,15 @@ public class MutationState implements SQLCloseable {
             }
         }
     }
-    
+
     private static class TableInfo {
-        
+
         private final boolean isDataTable;
-        @Nonnull private final PName hTableName;
-        @Nonnull private final TableRef origTableRef;
-        
+        @Nonnull
+        private final PName hTableName;
+        @Nonnull
+        private final TableRef origTableRef;
+
         public TableInfo(boolean isDataTable, PName hTableName, TableRef origTableRef) {
             super();
             checkNotNull(hTableName);
@@ -981,14 +964,14 @@ public class MutationState implements SQLCloseable {
             if (this == obj) return true;
             if (obj == null) return false;
             if (getClass() != obj.getClass()) return false;
-            TableInfo other = (TableInfo) obj;
+            TableInfo other = (TableInfo)obj;
             if (!hTableName.equals(other.hTableName)) return false;
             if (isDataTable != other.isDataTable) return false;
             return true;
         }
 
     }
-    
+
     @SuppressWarnings("deprecation")
     private void send(Iterator<TableRef> tableRefIterator) throws SQLException {
         int i = 0;
@@ -1001,7 +984,7 @@ public class MutationState implements SQLCloseable {
         }
 
         MultiRowMutationState multiRowMutationState;
-        Map<TableInfo,List<Mutation>> physicalTableMutationMap = Maps.newLinkedHashMap(); 
+        Map<TableInfo, List<Mutation>> physicalTableMutationMap = Maps.newLinkedHashMap();
         // add tracing for this operation
         try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) {
             Span span = trace.getSpan();
@@ -1014,21 +997,22 @@ public class MutationState implements SQLCloseable {
                     continue;
                 }
                 // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
-                long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, multiRowMutationState) : serverTimeStamps[i++];
+                long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef,
+                        multiRowMutationState) : serverTimeStamps[i++];
                 Long scn = connection.getSCN();
                 long mutationTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
                 final PTable table = tableRef.getTable();
-                Iterator<Pair<PName,List<Mutation>>> mutationsIterator = addRowMutations(tableRef, multiRowMutationState, mutationTimestamp, serverTimestamp, false, sendAll);
+                Iterator<Pair<PName, List<Mutation>>> mutationsIterator = addRowMutations(tableRef,
+                        multiRowMutationState, mutationTimestamp, serverTimestamp, false, sendAll);
                 // build map from physical table to mutation list
                 boolean isDataTable = true;
                 while (mutationsIterator.hasNext()) {
-                    Pair<PName,List<Mutation>> pair = mutationsIterator.next();
+                    Pair<PName, List<Mutation>> pair = mutationsIterator.next();
                     PName hTableName = pair.getFirst();
                     List<Mutation> mutationList = pair.getSecond();
                     TableInfo tableInfo = new TableInfo(isDataTable, hTableName, tableRef);
                     List<Mutation> oldMutationList = physicalTableMutationMap.put(tableInfo, mutationList);
-                    if (oldMutationList!=null)
-                        mutationList.addAll(0, oldMutationList);
+                    if (oldMutationList != null) mutationList.addAll(0, oldMutationList);
                     isDataTable = false;
                 }
                 // For transactions, track the statement indexes as we send data
@@ -1049,23 +1033,25 @@ public class MutationState implements SQLCloseable {
                 }
             }
             long serverTimestamp = HConstants.LATEST_TIMESTAMP;
-            Iterator<Entry<TableInfo, List<Mutation>>> mutationsIterator = physicalTableMutationMap.entrySet().iterator();
+            Iterator<Entry<TableInfo, List<Mutation>>> mutationsIterator = physicalTableMutationMap.entrySet()
+                    .iterator();
             while (mutationsIterator.hasNext()) {
                 Entry<TableInfo, List<Mutation>> pair = mutationsIterator.next();
                 TableInfo tableInfo = pair.getKey();
                 byte[] htableName = tableInfo.getHTableName().getBytes();
                 List<Mutation> mutationList = pair.getValue();
-                
-                //create a span per target table
-                //TODO maybe we can be smarter about the table name to string here?
-                Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName));
+
+                // create a span per target table
+                // TODO maybe we can be smarter about the table name to string here?
+                Span child = Tracing.child(span, "Writing mutation batch for table: " + Bytes.toString(htableName));
 
                 int retryCount = 0;
                 boolean shouldRetry = false;
                 long numMutations = 0;
                 long mutationSizeBytes = 0;
                 long mutationCommitTime = 0;
-                long numFailedMutations = 0;;
+                long numFailedMutations = 0;
+                ;
                 long startTime = 0;
                 boolean shouldRetryIndexedMutation = false;
                 IndexWriteException iwe = null;
@@ -1073,11 +1059,12 @@ public class MutationState implements SQLCloseable {
                     TableRef origTableRef = tableInfo.getOrigTableRef();
                     PTable table = origTableRef.getTable();
                     table.getIndexMaintainers(indexMetaDataPtr, connection);
-                    final ServerCache cache = tableInfo.isDataTable() ? setMetaDataOnMutations(origTableRef, mutationList, indexMetaDataPtr) : null;
+                    final ServerCache cache = tableInfo.isDataTable() ? setMetaDataOnMutations(origTableRef,
+                            mutationList, indexMetaDataPtr) : null;
                     // If we haven't retried yet, retry for this case only, as it's possible that
                     // a split will occur after we send the index metadata cache to all known
                     // region servers.
-                    shouldRetry = cache!=null;
+                    shouldRetry = cache != null;
                     SQLException sqlE = null;
                     HTableInterface hTable = connection.getQueryServices().getTable(htableName);
                     try {
@@ -1093,16 +1080,17 @@ public class MutationState implements SQLCloseable {
                                 hTable = new MetaDataAwareHTable(hTable, origTableRef);
                             }
 
-                            hTable = phoenixTransactionContext.getTransactionalTable(hTable, table.isImmutableRows());
+                            hTable = phoenixTransactionContext.getTransactionalTableWriter(hTable, table);
                         }
-                        
+
                         numMutations = mutationList.size();
                         GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
                         mutationSizeBytes = calculateMutationSize(mutationList);
-                        
+
                         startTime = System.currentTimeMillis();
                         child.addTimelineAnnotation("Attempt " + retryCount);
-                        List<List<Mutation>> mutationBatchList = getMutationBatchList(batchSize, batchSizeBytes, mutationList);
+                        List<List<Mutation>> mutationBatchList = getMutationBatchList(batchSize, batchSizeBytes,
+                                mutationList);
                         for (final List<Mutation> mutationBatch : mutationBatchList) {
                             if (shouldRetryIndexedMutation) {
                                 // if there was an index write failure, retry the mutation in a loop
@@ -1116,14 +1104,16 @@ public class MutationState implements SQLCloseable {
                                             Thread.currentThread().interrupt();
                                             throw new IOException(e);
                                         }
-                                    }}, iwe,
-                                    connection, connection.getQueryServices().getProps());
+                                    }
+                                }, iwe, connection, connection.getQueryServices().getProps());
                             } else {
                                 hTable.batch(mutationBatch);
                             }
 
                             batchCount++;
-                            if (logger.isDebugEnabled()) logger.debug("Sent batch of " + mutationBatch.size() + " for " + Bytes.toString(htableName));
+                            if (logger.isDebugEnabled())
+                                logger.debug("Sent batch of " + mutationBatch.size() + " for "
+                                        + Bytes.toString(htableName));
                         }
                         child.stop();
                         child.stop();
@@ -1131,7 +1121,7 @@ public class MutationState implements SQLCloseable {
                         mutationCommitTime = System.currentTimeMillis() - startTime;
                         GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
                         numFailedMutations = 0;
-                        
+
                         // Remove batches as we process them
                         mutations.remove(origTableRef);
                         if (tableInfo.isDataTable()) {
@@ -1140,22 +1130,28 @@ public class MutationState implements SQLCloseable {
                             estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(mutations);
                         }
                     } catch (Exception e) {
-                    	mutationCommitTime = System.currentTimeMillis() - startTime;
+                        mutationCommitTime = System.currentTimeMillis() - startTime;
                         serverTimestamp = ServerUtil.parseServerTimestamp(e);
                         SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
                         if (inferredE != null) {
-                            if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
-                                // Swallow this exception once, as it's possible that we split after sending the index metadata
-                                // and one of the region servers doesn't have it. This will cause it to have it the next go around.
+                            if (shouldRetry
+                                    && retryCount == 0
+                                    && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND
+                                            .getErrorCode()) {
+                                // Swallow this exception once, as it's possible that we split after sending the index
+                                // metadata
+                                // and one of the region servers doesn't have it. This will cause it to have it the next
+                                // go around.
                                 // If it fails again, we don't retry.
-                                String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE;
+                                String msg = "Swallowing exception and retrying after clearing meta cache on connection. "
+                                        + inferredE;
                                 logger.warn(LogUtil.addCustomAnnotations(msg, connection));
                                 connection.getQueryServices().clearTableRegionCache(htableName);
 
                                 // add a new child span as this one failed
                                 child.addTimelineAnnotation(msg);
                                 child.stop();
-                                child = Tracing.child(span,"Failed batch, attempting retry");
+                                child = Tracing.child(span, "Failed batch, attempting retry");
 
                                 continue;
                             } else if (inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) {
@@ -1164,7 +1160,8 @@ public class MutationState implements SQLCloseable {
                                     // For an index write failure, the data table write succeeded,
                                     // so when we retry we need to set REPLAY_WRITES
                                     for (Mutation m : mutationList) {
-                                        m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
+                                        m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+                                                BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
                                         KeyValueUtil.setTimestamp(m, serverTimestamp);
                                     }
                                     shouldRetry = true;
@@ -1177,29 +1174,26 @@ public class MutationState implements SQLCloseable {
                         // Throw to client an exception that indicates the statements that
                         // were not committed successfully.
                         int[] uncommittedStatementIndexes = getUncommittedStatementIndexes();
-						sqlE = new CommitException(e, uncommittedStatementIndexes, serverTimestamp);
-						numFailedMutations = uncommittedStatementIndexes.length;
-						GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations);
+                        sqlE = new CommitException(e, uncommittedStatementIndexes, serverTimestamp);
+                        numFailedMutations = uncommittedStatementIndexes.length;
+                        GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations);
                     } finally {
-                    	MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime, numFailedMutations);
+                        MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes,
+                                mutationCommitTime, numFailedMutations);
                         mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric);
                         try {
-                            if (cache!=null) 
-                                cache.close();
+                            if (cache != null) cache.close();
                         } finally {
                             try {
                                 hTable.close();
-                            } 
-                            catch (IOException e) {
+                            } catch (IOException e) {
                                 if (sqlE != null) {
                                     sqlE.setNextException(ServerUtil.parseServerException(e));
                                 } else {
                                     sqlE = ServerUtil.parseServerException(e);
                                 }
-                            } 
-                            if (sqlE != null) {
-                                throw sqlE;
                             }
+                            if (sqlE != null) { throw sqlE; }
                         }
                     }
                 } while (shouldRetry && retryCount++ < 1);
@@ -1209,10 +1203,13 @@ public class MutationState implements SQLCloseable {
 
     /**
      * Split the list of mutations into multiple lists that don't exceed row and byte thresholds
-     * @param allMutationList List of HBase mutations
+     * 
+     * @param allMutationList
+     *            List of HBase mutations
      * @return List of lists of mutations
      */
-    public static List<List<Mutation>> getMutationBatchList(long batchSize, long batchSizeBytes, List<Mutation> allMutationList) {
+    public static List<List<Mutation>> getMutationBatchList(long batchSize, long batchSizeBytes,
+            List<Mutation> allMutationList) {
         List<List<Mutation>> mutationBatchList = Lists.newArrayList();
         List<Mutation> currentList = Lists.newArrayList();
         long currentBatchSizeBytes = 0L;
@@ -1243,12 +1240,10 @@ public class MutationState implements SQLCloseable {
             ImmutableBytesWritable indexMetaDataPtr) throws SQLException {
         PTable table = tableRef.getTable();
         final byte[] tenantIdBytes;
-        if(table.isMultiTenant()) {
-            tenantIdBytes = connection.getTenantId() == null ? null :
-                    ScanUtil.getTenantIdBytes(
-                            table.getRowKeySchema(),
-                            table.getBucketNum() != null,
-                            connection.getTenantId(), table.getViewIndexId() != null);
+        if (table.isMultiTenant()) {
+            tenantIdBytes = connection.getTenantId() == null ? null : ScanUtil.getTenantIdBytes(
+                    table.getRowKeySchema(), table.getBucketNum() != null, connection.getTenantId(),
+                    table.getViewIndexId() != null);
         } else {
             tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
         }
@@ -1261,7 +1256,8 @@ public class MutationState implements SQLCloseable {
         }
         boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
         if (hasIndexMetaData) {
-            if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) {
+            if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength()
+                    + txState.length)) {
                 IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
                 cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState);
                 uuidValue = cache.getId();
@@ -1269,9 +1265,7 @@ public class MutationState implements SQLCloseable {
                 attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
                 uuidValue = ServerCacheClient.generateId();
             }
-        } else if (txState.length == 0) {
-            return null;
-        }
+        } else if (txState.length == 0) { return null; }
         // Either set the UUID to be able to access the index metadata from the cache
         // or set the index metadata directly on the Mutation
         for (Mutation mutation : mutations) {
@@ -1281,7 +1275,8 @@ public class MutationState implements SQLCloseable {
             mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
             if (attribValue != null) {
                 mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
-                mutation.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
+                mutation.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION,
+                        Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
                 if (txState.length > 0) {
                     mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
                 }
@@ -1291,23 +1286,23 @@ public class MutationState implements SQLCloseable {
         }
         return cache;
     }
-    
+
     private void addUncommittedStatementIndexes(Collection<RowMutationState> rowMutations) {
         for (RowMutationState rowMutationState : rowMutations) {
-            uncommittedStatementIndexes = joinSortedIntArrays(uncommittedStatementIndexes, rowMutationState.getStatementIndexes());
+            uncommittedStatementIndexes = joinSortedIntArrays(uncommittedStatementIndexes,
+                    rowMutationState.getStatementIndexes());
         }
     }
-    
+
     private int[] getUncommittedStatementIndexes() {
         for (MultiRowMutationState rowMutationMap : mutations.values()) {
             addUncommittedStatementIndexes(rowMutationMap.values());
         }
         return uncommittedStatementIndexes;
     }
-    
+
     @Override
-    public void close() throws SQLException {
-    }
+    public void close() throws SQLException {}
 
     private void resetState() {
         numRows = 0;
@@ -1335,26 +1330,29 @@ public class MutationState implements SQLCloseable {
         Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap();
         int retryCount = 0;
         do {
-            boolean sendSuccessful=false;
+            boolean sendSuccessful = false;
             boolean retryCommit = false;
             SQLException sqlE = null;
             try {
                 send();
                 txMutations = this.txMutations;
-                sendSuccessful=true;
+                sendSuccessful = true;
             } catch (SQLException e) {
                 sqlE = e;
             } finally {
                 try {
-                    boolean finishSuccessful=false;
+                    boolean finishSuccessful = false;
                     try {
                         if (sendSuccessful) {
                             phoenixTransactionContext.commit();
                             finishSuccessful = true;
                         }
                     } catch (SQLException e) {
-                        if (logger.isInfoEnabled()) logger.info(e.getClass().getName() + " at timestamp " + getInitialWritePointer() + " with retry count of " + retryCount);
-                        retryCommit = (e.getErrorCode() == SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode() && retryCount < MAX_COMMIT_RETRIES);
+                        if (logger.isInfoEnabled())
+                            logger.info(e.getClass().getName() + " at timestamp " + getInitialWritePointer()
+                                    + " with retry count of " + retryCount);
+                        retryCommit = (e.getErrorCode() == SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION
+                                .getErrorCode() && retryCount < MAX_COMMIT_RETRIES);
                         if (sqlE == null) {
                             sqlE = e;
                         } else {
@@ -1401,9 +1399,7 @@ public class MutationState implements SQLCloseable {
                                 }
                             }
                         }
-                        if (sqlE != null && !retryCommit) {
-                            throw sqlE;
-                        }
+                        if (sqlE != null && !retryCommit) { throw sqlE; }
                     }
                 }
             }
@@ -1418,11 +1414,12 @@ public class MutationState implements SQLCloseable {
 
     /**
      * Determines whether indexes were added to mutated tables while the transaction was in progress.
+     * 
      * @return true if indexes were added and false otherwise.
-     * @throws SQLException 
+     * @throws SQLException
      */
     private boolean shouldResubmitTransaction(Set<TableRef> txTableRefs) throws SQLException {
-        if (logger.isInfoEnabled()) logger.info("Checking for index updates as of "  + getInitialWritePointer());
+        if (logger.isInfoEnabled()) logger.info("Checking for index updates as of " + getInitialWritePointer());
         MetaDataClient client = new MetaDataClient(connection);
         PMetaData cache = connection.getMetaDataCache();
         boolean addedAnyIndexes = false;
@@ -1432,15 +1429,16 @@ public class MutationState implements SQLCloseable {
             List<PTable> oldIndexes;
             PTableRef ptableRef = cache.getTableRef(dataTable.getKey());
             oldIndexes = ptableRef.getTable().getIndexes();
-            // Always check at server for metadata change, as it's possible that the table is configured to not check for metadata changes
+            // Always check at server for metadata change, as it's possible that the table is configured to not check
+            // for metadata changes
             // but in this case, the tx manager is telling us it's likely that there has been a change.
-            MetaDataMutationResult result = client.updateCache(dataTable.getTenantId(), dataTable.getSchemaName().getString(), dataTable.getTableName().getString(), true);
+            MetaDataMutationResult result = client.updateCache(dataTable.getTenantId(), dataTable.getSchemaName()
+                    .getString(), dataTable.getTableName().getString(), true);
             long timestamp = TransactionUtil.getResolvedTime(connection, result);
             tableRef.setTimeStamp(timestamp);
             PTable updatedDataTable = result.getTable();
-            if (updatedDataTable == null) {
-                throw new TableNotFoundException(dataTable.getSchemaName().getString(), dataTable.getTableName().getString());
-            }
+            if (updatedDataTable == null) { throw new TableNotFoundException(dataTable.getSchemaName().getString(),
+                    dataTable.getTableName().getString()); }
             allImmutableTables &= updatedDataTable.isImmutableRows();
             tableRef.setTable(updatedDataTable);
             if (!addedAnyIndexes) {
@@ -1448,10 +1446,14 @@ public class MutationState implements SQLCloseable {
                 // that an index was dropped and recreated with the same name but different
                 // indexed/covered columns.
                 addedAnyIndexes = (!oldIndexes.equals(updatedDataTable.getIndexes()));
-                if (logger.isInfoEnabled()) logger.info((addedAnyIndexes ? "Updates " : "No updates ") + "as of "  + timestamp + " to " + updatedDataTable.getName().getString() + " with indexes " + updatedDataTable.getIndexes());
+                if (logger.isInfoEnabled())
+                    logger.info((addedAnyIndexes ? "Updates " : "No updates ") + "as of " + timestamp + " to "
+                            + updatedDataTable.getName().getString() + " with indexes " + updatedDataTable.getIndexes());
             }
         }
-        if (logger.isInfoEnabled()) logger.info((addedAnyIndexes ? "Updates " : "No updates ") + "to indexes as of "  + getInitialWritePointer() + " over " + (allImmutableTables ? " all immutable tables" : " some mutable tables"));
+        if (logger.isInfoEnabled())
+            logger.info((addedAnyIndexes ? "Updates " : "No updates ") + "to indexes as of " + getInitialWritePointer()
+                    + " over " + (allImmutableTables ? " all immutable tables" : " some mutable tables"));
         // If all tables are immutable, we know the conflict we got was due to our DDL/DML fence.
         // If any indexes were added, then the conflict might be due to DDL/DML fence.
         return allImmutableTables || addedAnyIndexes;
@@ -1459,16 +1461,18 @@ public class MutationState implements SQLCloseable {
 
     /**
      * Send to HBase any uncommitted data for transactional tables.
+     * 
      * @return true if any data was sent and false otherwise.
      * @throws SQLException
      */
     public boolean sendUncommitted() throws SQLException {
         return sendUncommitted(mutations.keySet().iterator());
     }
+
     /**
-     * Support read-your-own-write semantics by sending uncommitted data to HBase prior to running a
-     * query. In this way, they are visible to subsequent reads but are not actually committed until
-     * commit is called.
+     * Support read-your-own-write semantics by sending uncommitted data to HBase prior to running a query. In this way,
+     * they are visible to subsequent reads but are not actually committed until commit is called.
+     * 
      * @param tableRefs
      * @return true if any data was sent and false otherwise.
      * @throws SQLException
@@ -1481,7 +1485,7 @@ public class MutationState implements SQLCloseable {
             phoenixTransactionContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT);
         }
 
-        Iterator<TableRef> filteredTableRefs = Iterators.filter(tableRefs, new Predicate<TableRef>(){
+        Iterator<TableRef> filteredTableRefs = Iterators.filter(tableRefs, new Predicate<TableRef>() {
             @Override
             public boolean apply(TableRef tableRef) {
                 return tableRef.getTable().isTransactional();
@@ -1497,113 +1501,120 @@ public class MutationState implements SQLCloseable {
                 if (tableRef.getTable().isTransactional()) {
                     startTransaction(tableRef.getTable().getTransactionProvider());
                 }
-                strippedAliases.add(new TableRef(null, tableRef.getTable(), tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(), tableRef.hasDynamicCols()));
+                strippedAliases.add(new TableRef(null, tableRef.getTable(), tableRef.getTimeStamp(), tableRef
+                        .getLowerBoundTimeStamp(), tableRef.hasDynamicCols()));
             }
             send(strippedAliases.iterator());
             return true;
         }
         return false;
     }
-        
+
     public void send() throws SQLException {
         send(null);
     }
-    
+
     public static int[] joinSortedIntArrays(int[] a, int[] b) {
         int[] result = new int[a.length + b.length];
         int i = 0, j = 0, k = 0, current;
         while (i < a.length && j < b.length) {
             current = a[i] < b[j] ? a[i++] : b[j++];
-            for ( ; i < a.length && a[i] == current; i++);
-            for ( ; j < b.length && b[j] == current; j++);
+            for (; i < a.length && a[i] == current; i++)
+                ;
+            for (; j < b.length && b[j] == current; j++)
+                ;
             result[k++] = current;
         }
         while (i < a.length) {
-            for (current = a[i++] ; i < a.length && a[i] == current; i++);
+            for (current = a[i++]; i < a.length && a[i] == current; i++)
+                ;
             result[k++] = current;
         }
         while (j < b.length) {
-            for (current = b[j++] ; j < b.length && b[j] == current; j++);
+            for (current = b[j++]; j < b.length && b[j] == current; j++)
+                ;
             result[k++] = current;
         }
         return Arrays.copyOf(result, k);
     }
-    
+
     @Immutable
     public static class RowTimestampColInfo {
         private final boolean useServerTimestamp;
-        private final Long rowTimestamp; 
-        
+        private final Long rowTimestamp;
+
         public static final RowTimestampColInfo NULL_ROWTIMESTAMP_INFO = new RowTimestampColInfo(false, null);
 
         public RowTimestampColInfo(boolean autoGenerate, Long value) {
             this.useServerTimestamp = autoGenerate;
             this.rowTimestamp = value;
         }
-        
+
         public boolean useServerTimestamp() {
             return useServerTimestamp;
         }
-        
+
         public Long getTimestamp() {
             return rowTimestamp;
         }
     }
-    
+
     public static class MultiRowMutationState {
-        private Map<ImmutableBytesPtr,RowMutationState> rowKeyToRowMutationState;
+        private Map<ImmutableBytesPtr, RowMutationState> rowKeyToRowMutationState;
         private long estimatedSize;
-        
+
         public MultiRowMutationState(int size) {
             this.rowKeyToRowMutationState = Maps.newHashMapWithExpectedSize(size);
             this.estimatedSize = 0;
         }
-        
-        public RowMutationState put(ImmutableBytesPtr ptr, RowMutationState rowMutationState) { 
+
+        public RowMutationState put(ImmutableBytesPtr ptr, RowMutationState rowMutationState) {
             estimatedSize += rowMutationState.calculateEstimatedSize();
             return rowKeyToRowMutationState.put(ptr, rowMutationState);
         }
-        
+
         public void putAll(MultiRowMutationState other) {
             estimatedSize += other.estimatedSize;
             rowKeyToRowMutationState.putAll(other.rowKeyToRowMutationState);
         }
-        
+
         public boolean isEmpty() {
             return rowKeyToRowMutationState.isEmpty();
         }
-        
+
         public int size() {
             return rowKeyToRowMutationState.size();
         }
-        
+
         public Set<Entry<ImmutableBytesPtr, RowMutationState>> entrySet() {
             return rowKeyToRowMutationState.entrySet();
         }
-        
-        public void clear(){
+
+        public void clear() {
             rowKeyToRowMutationState.clear();
             estimatedSize = 0;
         }
-        
+
         public Collection<RowMutationState> values() {
             return rowKeyToRowMutationState.values();
         }
     }
-    
+
     public static class RowMutationState {
-        @Nonnull private Map<PColumn,byte[]> columnValues;
+        @Nonnull
+        private Map<PColumn, byte[]> columnValues;
         private int[] statementIndexes;
-        @Nonnull private final RowTimestampColInfo rowTsColInfo;
+        @Nonnull
+        private final RowTimestampColInfo rowTsColInfo;
         private byte[] onDupKeyBytes;
         private long colValuesSize;
-        
-        public RowMutationState(@Nonnull Map<PColumn,byte[]> columnValues, long colValuesSize, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo,
-                byte[] onDupKeyBytes) {
+
+        public RowMutationState(@Nonnull Map<PColumn, byte[]> columnValues, long colValuesSize, int statementIndex,
+                @Nonnull RowTimestampColInfo rowTsColInfo, byte[] onDupKeyBytes) {
             checkNotNull(columnValues);
             checkNotNull(rowTsColInfo);
             this.columnValues = columnValues;
-            this.statementIndexes = new int[] {statementIndex};
+            this.statementIndexes = new int[] { statementIndex };
             this.rowTsColInfo = rowTsColInfo;
             this.onDupKeyBytes = onDupKeyBytes;
             this.colValuesSize = colValuesSize;
@@ -1631,13 +1642,13 @@ public class MutationState implements SQLCloseable {
             // ignore the new values (as that's what the server will do).
             if (newRow.onDupKeyBytes == null) {
                 // increment the column value size by the new row column value size
-                colValuesSize+=newRow.colValuesSize;
-                for (Map.Entry<PColumn,byte[]> entry : newRow.columnValues.entrySet()) {
+                colValuesSize += newRow.colValuesSize;
+                for (Map.Entry<PColumn, byte[]> entry : newRow.columnValues.entrySet()) {
                     PColumn col = entry.getKey();
                     byte[] oldValue = columnValues.put(col, entry.getValue());
-                    if (oldValue!=null) {
+                    if (oldValue != null) {
                         // decrement column value size by the size of all column values that were replaced
-                        colValuesSize-=(col.getEstimatedSize() + oldValue.length);
+                        colValuesSize -= (col.getEstimatedSize() + oldValue.length);
                     }
                 }
             }
@@ -1646,14 +1657,14 @@ public class MutationState implements SQLCloseable {
             this.onDupKeyBytes = PhoenixIndexBuilder.combineOnDupKey(this.onDupKeyBytes, newRow.onDupKeyBytes);
             statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes());
         }
-        
+
         @Nonnull
         RowTimestampColInfo getRowTimestampColInfo() {
             return rowTsColInfo;
         }
 
     }
-    
+
     public ReadMetricQueue getReadMetricQueue() {
         return readMetricQueue;
     }
@@ -1665,5 +1676,5 @@ public class MutationState implements SQLCloseable {
     public MutationMetricQueue getMutationMetricQueue() {
         return mutationMetricQueue;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f3119320/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
index 6348d6d..0016fa9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
@@ -95,7 +95,7 @@ public class PhoenixTxIndexMutationGenerator {
         stored.addAll(m);
     }
 
-    public Collection<Pair<Mutation, byte[]>> getIndexUpdates(HTableInterface htable, Iterator<Mutation> mutationIterator) throws IOException, SQLException {
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdates(HTableInterface htable, Iterator<? extends Mutation> mutationIterator) throws IOException, SQLException {
 
         if (!mutationIterator.hasNext()) {
             return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/f3119320/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index 543eda1..ab9e8a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -130,4 +130,10 @@ public class OmidTransactionContext implements PhoenixTransactionContext {
         // TODO Auto-generated method stub
         return null;
     }
+
+    @Override
+    public HTableInterface getTransactionalTableWriter(HTableInterface htable, PTable table) {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }


Mime
View raw message