phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject phoenix git commit: PHOENIX-4773 Move HTable rollback wrapper into Tephra TAL method
Date Wed, 06 Jun 2018 05:03:28 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.3 a91f34dc1 -> 59237384e


PHOENIX-4773 Move HTable rollback wrapper into Tephra TAL method


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/59237384
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/59237384
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/59237384

Branch: refs/heads/4.x-HBase-1.3
Commit: 59237384eed97d08201d027fb4060de9440fd45a
Parents: a91f34d
Author: James Taylor <jamestaylor@apache.org>
Authored: Mon Jun 4 20:27:36 2018 -0700
Committer: James Taylor <jamestaylor@apache.org>
Committed: Tue Jun 5 22:03:11 2018 -0700

----------------------------------------------------------------------
 .../apache/phoenix/cache/ServerCacheClient.java |  21 +-
 .../apache/phoenix/execute/HashJoinPlan.java    |   7 +-
 .../apache/phoenix/execute/MutationState.java   | 190 ++-----------------
 .../PhoenixTxIndexMutationGenerator.java        |  42 ++++
 .../phoenix/index/IndexMetaDataCacheClient.java |  67 ++++++-
 .../apache/phoenix/join/HashCacheClient.java    |   5 +-
 .../transaction/OmidTransactionContext.java     |   3 +-
 .../transaction/PhoenixTransactionContext.java  |   5 +-
 .../transaction/TephraTransactionContext.java   |  91 ++++++++-
 .../java/org/apache/phoenix/util/IndexUtil.java |   8 +
 10 files changed, 230 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/59237384/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 68de747..5e284bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -70,7 +70,6 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
@@ -90,7 +89,7 @@ public class ServerCacheClient {
     private static final Random RANDOM = new Random();
 	public static final String HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER = "hash.join.server.cache.resend.per.server";
     private final PhoenixConnection connection;
-    private final Map<Integer, TableRef> cacheUsingTableRefMap = new ConcurrentHashMap<Integer,
TableRef>();
+    private final Map<Integer, PTable> cacheUsingTableMap = new ConcurrentHashMap<Integer,
PTable>();
 
     /**
      * Construct client used to create a serialized cached snapshot of a table and send it
to each region server
@@ -220,12 +219,12 @@ public class ServerCacheClient {
     }
     
     public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable
cachePtr, final byte[] txState,
-            final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws
SQLException {
-        return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTableRef,
false);
+            final ServerCacheFactory cacheFactory, final PTable cacheUsingTable) throws SQLException
{
+        return addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTable,
false);
     }
     
     public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable
cachePtr, final byte[] txState,
-            final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef, boolean
storeCacheOnClient)
+            final ServerCacheFactory cacheFactory, final PTable cacheUsingTable, boolean
storeCacheOnClient)
             throws SQLException {
         ConnectionQueryServices services = connection.getQueryServices();
         List<Closeable> closeables = new ArrayList<Closeable>();
@@ -241,7 +240,6 @@ public class ServerCacheClient {
         ExecutorService executor = services.getExecutor();
         List<Future<Boolean>> futures = Collections.emptyList();
         try {
-            final PTable cacheUsingTable = cacheUsingTableRef.getTable();
             List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes());
             int nRegions = locations.size();
             // Size these based on worst case
@@ -258,7 +256,7 @@ public class ServerCacheClient {
                     servers.add(entry);
                     if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Adding cache
entry to be sent for " + entry, connection));}
                     final byte[] key = getKeyInRegion(entry.getRegionInfo().getStartKey());
-                    final HTableInterface htable = services.getTable(cacheUsingTableRef.getTable().getPhysicalName().getBytes());
+                    final HTableInterface htable = services.getTable(cacheUsingTable.getPhysicalName().getBytes());
                     closeables.add(htable);
                     futures.add(executor.submit(new JobCallable<Boolean>() {
                         
@@ -294,7 +292,7 @@ public class ServerCacheClient {
                 future.get(timeoutMs, TimeUnit.MILLISECONDS);
             }
             
-            cacheUsingTableRefMap.put(Bytes.mapKey(cacheId), cacheUsingTableRef);
+            cacheUsingTableMap.put(Bytes.mapKey(cacheId), cacheUsingTable);
             success = true;
         } catch (SQLException e) {
             firstException = e;
@@ -337,9 +335,8 @@ public class ServerCacheClient {
         try {
             ConnectionQueryServices services = connection.getQueryServices();
             Throwable lastThrowable = null;
-            TableRef cacheUsingTableRef = cacheUsingTableRefMap.get(Bytes.mapKey(cacheId));
-            final PTable cacheUsingTable = cacheUsingTableRef.getTable();
-            byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes();
+            final PTable cacheUsingTable = cacheUsingTableMap.get(Bytes.mapKey(cacheId));
+            byte[] tableName = cacheUsingTable.getPhysicalName().getBytes();
             iterateOverTable = services.getTable(tableName);
 
             List<HRegionLocation> locations = services.getAllTableRegions(tableName);
@@ -403,7 +400,7 @@ public class ServerCacheClient {
                         lastThrowable);
             }
         } finally {
-            cacheUsingTableRefMap.remove(cacheId);
+            cacheUsingTableMap.remove(cacheId);
             Closeables.closeQuietly(iterateOverTable);
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/59237384/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 5b433b3..bfe089d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -66,7 +66,10 @@ import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.optimize.Cost;
-import org.apache.phoenix.parse.*;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.SQLParser;
+import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -494,7 +497,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
                     cache =
                             parent.hashClient.addHashCache(ranges, iterator,
                                 plan.getEstimatedSize(), hashExpressions, singleValueOnly,
-                                parent.delegate.getTableRef(), keyRangeRhsExpression,
+                                parent.delegate.getTableRef().getTable(), keyRangeRhsExpression,
                                 keyRangeRhsValues);
                     long endTime = System.currentTimeMillis();
                     boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/59237384/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 2e795b1..c29d6b5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -38,7 +38,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.concurrent.Immutable;
 
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -51,7 +50,6 @@ import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -60,10 +58,8 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexBuilder;
-import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand;
-import org.apache.phoenix.index.PhoenixIndexMetaData;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.monitoring.GlobalClientMetrics;
@@ -94,14 +90,10 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
-import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.LogUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SQLCloseable;
-import org.apache.phoenix.util.SQLCloseables;
-import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.SizedUtil;
 import org.apache.phoenix.util.TransactionUtil;
@@ -499,19 +491,13 @@ public class MutationState implements SQLCloseable {
         return ptr;
     }
 
-    private static List<PTable> getClientMaintainedIndexes(PTable table) {
-        Iterator<PTable> indexIterator = // Only maintain tables with immutable rows
through this client-side mechanism
-        (table.isImmutableRows() || table.isTransactional()) ? IndexMaintainer.maintainedGlobalIndexes(table
-                .getIndexes().iterator()) : Collections.<PTable> emptyIterator();
-        return Lists.newArrayList(indexIterator);
-    }
-
     private Iterator<Pair<PName, List<Mutation>>> addRowMutations(final
TableRef tableRef,
             final MultiRowMutationState values, final long mutationTimestamp, final long
serverTimestamp,
             boolean includeAllIndexes, final boolean sendAll) {
         final PTable table = tableRef.getTable();
-        final List<PTable> indexList = includeAllIndexes ? Lists.newArrayList(IndexMaintainer.maintainedIndexes(table
-                .getIndexes().iterator())) : getClientMaintainedIndexes(table);
+        final List<PTable> indexList = includeAllIndexes ? 
+                Lists.newArrayList(IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()))
: 
+                    IndexUtil.getClientMaintainedIndexes(table);
         final Iterator<PTable> indexes = indexList.iterator();
         final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size());
         final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists
@@ -541,7 +527,7 @@ public class MutationState implements SQLCloseable {
                     if (!mutationsPertainingToIndex.isEmpty()) {
                         if (table.isTransactional()) {
                             if (indexMutationsMap == null) {
-                                PhoenixTxIndexMutationGenerator generator = newTxIndexMutationGenerator(table,
+                                PhoenixTxIndexMutationGenerator generator = PhoenixTxIndexMutationGenerator.newGenerator(connection,
table,
                                         indexList, mutationsPertainingToIndex.get(0).getAttributesMap());
                                 try (HTableInterface htable = connection.getQueryServices().getTable(
                                         table.getPhysicalName().getBytes())) {
@@ -596,43 +582,6 @@ public class MutationState implements SQLCloseable {
         };
     }
 
-    private PhoenixTxIndexMutationGenerator newTxIndexMutationGenerator(PTable table, List<PTable>
indexes,
-            Map<String, byte[]> attributes) {
-        final List<IndexMaintainer> indexMaintainers = Lists.newArrayListWithExpectedSize(indexes.size());
-        for (PTable index : indexes) {
-            IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
-            indexMaintainers.add(maintainer);
-        }
-        IndexMetaDataCache indexMetaDataCache = new IndexMetaDataCache() {
-
-            @Override
-            public void close() throws IOException {}
-
-            @Override
-            public List<IndexMaintainer> getIndexMaintainers() {
-                return indexMaintainers;
-            }
-
-            @Override
-            public PhoenixTransactionContext getTransactionContext() {
-                return phoenixTransactionContext.newTransactionContext(phoenixTransactionContext,
true);
-            }
-
-            @Override
-            public int getClientVersion() {
-                return MetaDataProtocol.PHOENIX_VERSION;
-            }
-
-        };
-        try {
-            PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(indexMetaDataCache,
attributes);
-            return new PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(),
indexMetaData,
-                    table.getPhysicalName().getBytes());
-        } catch (IOException e) {
-            throw new RuntimeException(e); // Impossible
-        }
-    }
-
     private void generateMutations(final TableRef tableRef, final long mutationTimestamp,
final long serverTimestamp,
             final MultiRowMutationState values, final List<Mutation> mutationList,
             final List<Mutation> mutationsPertainingToIndex) {
@@ -793,7 +742,6 @@ public class MutationState implements SQLCloseable {
 
     private long validateAndGetServerTimestamp(TableRef tableRef, MultiRowMutationState rowKeyToColumnMap)
             throws SQLException {
-        Long scn = connection.getSCN();
         MetaDataClient client = new MetaDataClient(connection);
         long serverTimeStamp = tableRef.getTimeStamp();
         // If we're auto committing, we've already validated the schema when we got the ColumnResolver,
@@ -862,65 +810,6 @@ public class MutationState implements SQLCloseable {
         return batchCount;
     }
 
-    private class MetaDataAwareHTable extends DelegateHTable {
-        private final TableRef tableRef;
-
-        private MetaDataAwareHTable(HTableInterface delegate, TableRef tableRef) {
-            super(delegate);
-            this.tableRef = tableRef;
-        }
-
-        /**
-         * Called by Tephra when a transaction is aborted. We have this wrapper so that we
get an opportunity to attach
-         * our index meta data to the mutations such that we can also undo the index mutations.
-         */
-        @Override
-        public void delete(List<Delete> deletes) throws IOException {
-            ServerCache cache = null;
-            try {
-                if (deletes.isEmpty()) { return; }
-                // Attach meta data for server maintained indexes
-                PTable table = tableRef.getTable();
-                ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
-                if (table.getIndexMaintainers(indexMetaDataPtr, connection)) {
-                    cache = setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr);
-                }
-
-                // Send deletes for client maintained indexes
-                List<PTable> indexes = getClientMaintainedIndexes(table);
-                if (!indexes.isEmpty()) {
-                PhoenixTxIndexMutationGenerator generator = newTxIndexMutationGenerator(table,
indexes, deletes.get(0)
-                            .getAttributesMap());
-                    Collection<Pair<Mutation, byte[]>> indexUpdates = generator.getIndexUpdates(delegate,
-                            deletes.iterator());
-                    for (PTable index : indexes) {
-                        byte[] physicalName = index.getPhysicalName().getBytes();
-                        try (HTableInterface hindex = connection.getQueryServices().getTable(physicalName))
{
-                            List<Mutation> indexDeletes = Lists.newArrayListWithExpectedSize(deletes.size());
-                            for (Pair<Mutation, byte[]> mutationPair : indexUpdates)
{
-                                if (Bytes.equals(mutationPair.getSecond(), physicalName))
{
-                                    indexDeletes.add(mutationPair.getFirst());
-                                }
-                                hindex.batch(indexDeletes);
-                            }
-                        } catch (InterruptedException e) {
-                            Thread.currentThread().interrupt();
-                            throw new IOException(e);
-                        }
-                    }
-                }
-
-                delegate.delete(deletes);
-            } catch (SQLException e) {
-                throw new IOException(e);
-            } finally {
-                if (cache != null) {
-                    SQLCloseables.closeAllQuietly(Collections.singletonList(cache));
-                }
-            }
-        }
-    }
-
     private static class TableInfo {
 
         private final boolean isDataTable;
@@ -1059,8 +948,9 @@ public class MutationState implements SQLCloseable {
                     TableRef origTableRef = tableInfo.getOrigTableRef();
                     PTable table = origTableRef.getTable();
                     table.getIndexMaintainers(indexMetaDataPtr, connection);
-                    final ServerCache cache = tableInfo.isDataTable() ? setMetaDataOnMutations(origTableRef,
-                            mutationList, indexMetaDataPtr) : null;
+                    final ServerCache cache = tableInfo.isDataTable() ? 
+                            IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table,
+                                    mutationList, indexMetaDataPtr) : null;
                     // If we haven't retried yet, retry for this case only, as it's possible
that
                     // a split will occur after we send the index metadata cache to all known
                     // region servers.
@@ -1070,19 +960,12 @@ public class MutationState implements SQLCloseable {
                     try {
                         if (table.isTransactional()) {
                             // Track tables to which we've sent uncommitted data
-                            uncommittedPhysicalNames.add(table.getPhysicalName().getString());
-                            phoenixTransactionContext.markDMLFence(table);
-
-                            // If we have indexes, wrap the HTable in a delegate HTable that
-                            // will attach the necessary index meta data in the event of
a
-                            // rollback
-                            if (!table.getIndexes().isEmpty()) {
-                                hTable = new MetaDataAwareHTable(hTable, origTableRef);
+                            if (tableInfo.isDataTable()) {
+                                uncommittedPhysicalNames.add(table.getPhysicalName().getString());
+                                phoenixTransactionContext.markDMLFence(table);
                             }
-
-                            hTable = phoenixTransactionContext.getTransactionalTableWriter(hTable,
table);
+                            hTable = phoenixTransactionContext.getTransactionalTableWriter(connection,
table, hTable, !tableInfo.isDataTable());
                         }
-
                         numMutations = mutationList.size();
                         GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
                         mutationSizeBytes = calculateMutationSize(mutationList);
@@ -1236,57 +1119,6 @@ public class MutationState implements SQLCloseable {
         return phoenixTransactionContext.encodeTransaction();
     }
 
-    private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation>
mutations,
-            ImmutableBytesWritable indexMetaDataPtr) throws SQLException {
-        PTable table = tableRef.getTable();
-        final byte[] tenantIdBytes;
-        if (table.isMultiTenant()) {
-            tenantIdBytes = connection.getTenantId() == null ? null : ScanUtil.getTenantIdBytes(
-                    table.getRowKeySchema(), table.getBucketNum() != null, connection.getTenantId(),
-                    table.getViewIndexId() != null);
-        } else {
-            tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
-        }
-        ServerCache cache = null;
-        byte[] attribValue = null;
-        byte[] uuidValue = null;
-        byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
-        if (table.isTransactional()) {
-            txState = encodeTransaction();
-        }
-        boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
-        if (hasIndexMetaData) {
-            if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength()
-                    + txState.length)) {
-                IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection,
tableRef);
-                cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState);
-                uuidValue = cache.getId();
-            } else {
-                attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
-                uuidValue = ServerCacheClient.generateId();
-            }
-        } else if (txState.length == 0) { return null; }
-        // Either set the UUID to be able to access the index metadata from the cache
-        // or set the index metadata directly on the Mutation
-        for (Mutation mutation : mutations) {
-            if (connection.getTenantId() != null) {
-                mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes);
-            }
-            mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-            if (attribValue != null) {
-                mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
-                mutation.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION,
-                        Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
-                if (txState.length > 0) {
-                    mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-                }
-            } else if (!hasIndexMetaData && txState.length > 0) {
-                mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-            }
-        }
-        return cache;
-    }
-
     private void addUncommittedStatementIndexes(Collection<RowMutationState> rowMutations)
{
         for (RowMutationState rowMutationState : rowMutations) {
             uncommittedStatementIndexes = joinSortedIntArrays(uncommittedStatementIndexes,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/59237384/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
index 0016fa9..a7b5687 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
@@ -45,7 +45,9 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.cache.IndexMetaDataCache;
 import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.MultiMutation;
 import org.apache.phoenix.hbase.index.ValueGetter;
@@ -59,7 +61,9 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
@@ -440,4 +444,42 @@ public class PhoenixTxIndexMutationGenerator {
             return pair;
         }
     }
+    
+    public static PhoenixTxIndexMutationGenerator newGenerator(final PhoenixConnection connection,
PTable table, List<PTable> indexes,
+            Map<String, byte[]> attributes) {
+        final List<IndexMaintainer> indexMaintainers = Lists.newArrayListWithExpectedSize(indexes.size());
+        for (PTable index : indexes) {
+            IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
+            indexMaintainers.add(maintainer);
+        }
+        IndexMetaDataCache indexMetaDataCache = new IndexMetaDataCache() {
+
+            @Override
+            public void close() throws IOException {}
+
+            @Override
+            public List<IndexMaintainer> getIndexMaintainers() {
+                return indexMaintainers;
+            }
+
+            @Override
+            public PhoenixTransactionContext getTransactionContext() {
+                PhoenixTransactionContext context = connection.getMutationState().getPhoenixTransactionContext();
+                return context.newTransactionContext(context, true);
+            }
+
+            @Override
+            public int getClientVersion() {
+                return MetaDataProtocol.PHOENIX_VERSION;
+            }
+
+        };
+        try {
+            PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(indexMetaDataCache,
attributes);
+            return new PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(),
indexMetaData,
+                    table.getPhysicalName().getBytes());
+        } catch (IOException e) {
+            throw new RuntimeException(e); // Impossible
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/59237384/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
index fcabdfd..bd308c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
@@ -24,20 +24,25 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.join.MaxServerCacheSizeExceededException;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.ScanUtil;
 
 public class IndexMetaDataCacheClient {
 
     private final ServerCacheClient serverCache;
-    private TableRef cacheUsingTableRef;
+    private PTable cacheUsingTable;
     
     /**
      * Construct client used to send index metadata to each region server
@@ -45,9 +50,9 @@ public class IndexMetaDataCacheClient {
      * @param connection the client connection
      * @param cacheUsingTableRef table ref to table that will use the cache during its scan
      */
-    public IndexMetaDataCacheClient(PhoenixConnection connection, TableRef cacheUsingTableRef)
{
+    public IndexMetaDataCacheClient(PhoenixConnection connection, PTable cacheUsingTable)
{
         serverCache = new ServerCacheClient(connection);
-        this.cacheUsingTableRef = cacheUsingTableRef;
+        this.cacheUsingTable = cacheUsingTable;
     }
 
     /**
@@ -75,7 +80,7 @@ public class IndexMetaDataCacheClient {
         /**
          * Serialize and compress hashCacheTable
          */
-        return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, txState,
new IndexMetaDataCacheFactory(), cacheUsingTableRef);
+        return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, txState,
new IndexMetaDataCacheFactory(), cacheUsingTable);
     }
     
     
@@ -91,7 +96,55 @@ public class IndexMetaDataCacheClient {
         /**
          * Serialize and compress hashCacheTable
          */
-        return serverCache.addServerCache(ranges, ptr, txState, new IndexMetaDataCacheFactory(),
cacheUsingTableRef);
+        return serverCache.addServerCache(ranges, ptr, txState, new IndexMetaDataCacheFactory(),
cacheUsingTable);
+    }
+
+    public static ServerCache setMetaDataOnMutations(PhoenixConnection connection, PTable
table, List<? extends Mutation> mutations,
+            ImmutableBytesWritable indexMetaDataPtr) throws SQLException {
+        final byte[] tenantIdBytes;
+        if (table.isMultiTenant()) {
+            tenantIdBytes = connection.getTenantId() == null ? null : ScanUtil.getTenantIdBytes(
+                    table.getRowKeySchema(), table.getBucketNum() != null, connection.getTenantId(),
+                    table.getViewIndexId() != null);
+        } else {
+            tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
+        }
+        ServerCache cache = null;
+        byte[] attribValue = null;
+        byte[] uuidValue = null;
+        byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
+        if (table.isTransactional()) {
+            txState = connection.getMutationState().encodeTransaction();
+        }
+        boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
+        if (hasIndexMetaData) {
+            if (useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength()
+ txState.length)) {
+                IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection,
table);
+                cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState);
+                uuidValue = cache.getId();
+            } else {
+                attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+                uuidValue = ServerCacheClient.generateId();
+            }
+        } else if (txState.length == 0) { return null; }
+        // Either set the UUID to be able to access the index metadata from the cache
+        // or set the index metadata directly on the Mutation
+        for (Mutation mutation : mutations) {
+            if (connection.getTenantId() != null) {
+                mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantIdBytes);
+            }
+            mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+            if (attribValue != null) {
+                mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
+                mutation.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION,
+                        Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION));
+                if (txState.length > 0) {
+                    mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+                }
+            } else if (!hasIndexMetaData && txState.length > 0) {
+                mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+            }
+        }
+        return cache;
     }
-    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/59237384/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
index 2ec509c..83ac32d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -37,7 +37,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.ByteUtil;
@@ -77,13 +76,13 @@ public class HashCacheClient  {
      * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
      * size
      */
-    public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, long estimatedSize,
List<Expression> onExpressions, boolean singleValueOnly, TableRef cacheUsingTableRef,
Expression keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException
{
+    public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, long estimatedSize,
List<Expression> onExpressions, boolean singleValueOnly, PTable cacheUsingTable, Expression
keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException {
         /**
          * Serialize and compress hashCacheTable
          */
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         serialize(ptr, iterator, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression,
keyRangeRhsValues);
-        ServerCache cache = serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY,
new HashCacheFactory(), cacheUsingTableRef, true);
+        ServerCache cache = serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY,
new HashCacheFactory(), cacheUsingTable, true);
         return cache;
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/59237384/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
index ab9e8a6..d235d4b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.transaction;
 import java.sql.SQLException;
 
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
 
@@ -132,7 +133,7 @@ public class OmidTransactionContext implements PhoenixTransactionContext
{
     }
 
     @Override
-    public HTableInterface getTransactionalTableWriter(HTableInterface htable, PTable table)
{
+    public HTableInterface getTransactionalTableWriter(PhoenixConnection connection, PTable
table, HTableInterface htable, boolean isIndex) {
         // TODO Auto-generated method stub
         return null;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/59237384/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
index 751945a..dfa35be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java
@@ -21,6 +21,7 @@ import java.sql.SQLException;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
 
@@ -110,7 +111,7 @@ public interface PhoenixTransactionContext {
         }
 
         @Override
-        public HTableInterface getTransactionalTableWriter(HTableInterface htable, PTable
table) {
+        public HTableInterface getTransactionalTableWriter(PhoenixConnection connection,
PTable table, HTableInterface htable, boolean isIndex) {
             return null;
         }
     };
@@ -230,5 +231,5 @@ public interface PhoenixTransactionContext {
     public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext contex,
boolean subTask);
 
     public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable);
-    public HTableInterface getTransactionalTableWriter(HTableInterface htable, PTable table);
+    public HTableInterface getTransactionalTableWriter(PhoenixConnection connection, PTable
table, HTableInterface htable, boolean isIndex) throws SQLException;
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/59237384/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
index bc33cff..5b3c9b8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -20,20 +20,31 @@ package org.apache.phoenix.transaction;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.DelegateHTable;
+import org.apache.phoenix.execute.PhoenixTxIndexMutationGenerator;
+import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.transaction.TephraTransactionProvider.TephraTransactionClient;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.SQLCloseables;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.Transaction.VisibilityLevel;
 import org.apache.tephra.TransactionAware;
@@ -399,14 +410,19 @@ public class TephraTransactionContext implements PhoenixTransactionContext
{
     }
 
     @Override
-    public HTableInterface getTransactionalTableWriter(HTableInterface htable, PTable table)
{
-        boolean isIndex = table.getType() == PTableType.INDEX;
-        TransactionAwareHTable transactionAwareHTable = new TransactionAwareHTable(htable,
table.isImmutableRows() || isIndex ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
+    public HTableInterface getTransactionalTableWriter(PhoenixConnection connection, PTable
table, HTableInterface htable, boolean isIndex) throws SQLException {
+        // If we have indexes, wrap the HTable in a delegate HTable that
+        // will attach the necessary index meta data in the event of a
+        // rollback
+        TransactionAwareHTable transactionAwareHTable;
         // Don't add immutable indexes (those are the only ones that would participate
         // during a commit), as we don't need conflict detection for these.
         if (isIndex) {
+            transactionAwareHTable = new TransactionAwareHTable(htable, TxConstants.ConflictDetection.NONE);
             transactionAwareHTable.startTx(getTransaction());
         } else {
+            htable = new RollbackHookHTableWrapper(htable, table, connection);
+            transactionAwareHTable = new TransactionAwareHTable(htable, table.isImmutableRows()
? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW);
             // Even for immutable, we need to do this so that an abort has the state
             // necessary to generate the rows to delete.
             this.addTransactionAware(transactionAwareHTable);
@@ -414,4 +430,73 @@ public class TephraTransactionContext implements PhoenixTransactionContext
{
         return transactionAwareHTable;
     }
     
+    /**
+     * 
+     * Wraps Tephra data table HTables to catch when a rollback occurs so
+     * that index Delete mutations can be generated and applied (as
+     * opposed to storing them in the Tephra change set). This technique
+     * allows the Tephra API to be used directly with HBase APIs and
+     * Phoenix APIs since we can detect the rollback as a callback
+     * when the Tephra rollback is called.
+     *
+     */
+    private static class RollbackHookHTableWrapper extends DelegateHTable {
+        private final PTable table;
+        private final PhoenixConnection connection;
+
+        private RollbackHookHTableWrapper(HTableInterface delegate, PTable table, PhoenixConnection
connection) {
+            super(delegate);
+            this.table = table;
+            this.connection = connection;
+        }
+
+        /**
+         * Called by Tephra when a transaction is aborted. We have this wrapper so that we
get an opportunity to attach
+         * our index meta data to the mutations such that we can also undo the index mutations.
+         */
+        @Override
+        public void delete(List<Delete> deletes) throws IOException {
+            ServerCache cache = null;
+            try {
+                if (deletes.isEmpty()) { return; }
+                // Attach meta data for server maintained indexes
+                ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
+                if (table.getIndexMaintainers(indexMetaDataPtr, connection)) {
+                    cache = IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table,
deletes, indexMetaDataPtr);
+                }
+
+                // Send deletes for client maintained indexes
+                List<PTable> indexes = IndexUtil.getClientMaintainedIndexes(table);
+                if (!indexes.isEmpty()) {
+                    PhoenixTxIndexMutationGenerator generator = PhoenixTxIndexMutationGenerator.newGenerator(connection,
table, indexes, deletes.get(0)
+                            .getAttributesMap());
+                    Collection<Pair<Mutation, byte[]>> indexUpdates = generator.getIndexUpdates(delegate,
+                            deletes.iterator());
+                    for (PTable index : indexes) {
+                        byte[] physicalName = index.getPhysicalName().getBytes();
+                        try (HTableInterface hindex = connection.getQueryServices().getTable(physicalName))
{
+                            List<Mutation> indexDeletes = Lists.newArrayListWithExpectedSize(deletes.size());
+                            for (Pair<Mutation, byte[]> mutationPair : indexUpdates)
{
+                                if (Bytes.equals(mutationPair.getSecond(), physicalName))
{
+                                    indexDeletes.add(mutationPair.getFirst());
+                                }
+                                hindex.batch(indexDeletes);
+                            }
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            throw new IOException(e);
+                        }
+                    }
+                }
+
+                delegate.delete(deletes);
+            } catch (SQLException e) {
+                throw new IOException(e);
+            } finally {
+                if (cache != null) {
+                    SQLCloseables.closeAllQuietly(Collections.singletonList(cache));
+                }
+            }
+        }
+    }    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/59237384/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 78a68d2..3fe5438 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -32,6 +32,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
@@ -816,5 +817,12 @@ public class IndexUtil {
     				.setTableName(indexName).build().buildException();
     	}
     }
+
+    public static List<PTable> getClientMaintainedIndexes(PTable table) {
+        Iterator<PTable> indexIterator = // Only maintain tables with immutable rows
through this client-side mechanism
+        (table.isImmutableRows() || table.isTransactional()) ? IndexMaintainer.maintainedGlobalIndexes(table
+                .getIndexes().iterator()) : Collections.<PTable> emptyIterator();
+        return Lists.newArrayList(indexIterator);
+    }
     
 }


Mime
View raw message