phoenix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From JamesRTaylor <...@git.apache.org>
Subject [GitHub] phoenix pull request: PHOENIX-1674 Snapshot isolation transaction ...
Date Thu, 19 Nov 2015 06:51:12 GMT
Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/129#discussion_r45307398
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---
    @@ -410,149 +642,293 @@ private static long calculateMutationSize(List<Mutation>
mutations) {
             return byteSize;
         }
         
    +    private boolean hasKeyValueColumn(PTable table, PTable index) {
    +        IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
    +        return !maintainer.getAllColumns().isEmpty();
    +    }
    +    
    +    private void divideImmutableIndexes(Iterator<PTable> enabledImmutableIndexes,
PTable table, List<PTable> rowKeyIndexes, List<PTable> keyValueIndexes) {
    +        while (enabledImmutableIndexes.hasNext()) {
    +            PTable index = enabledImmutableIndexes.next();
    +            if (index.getIndexType() != IndexType.LOCAL) {
    +                if (hasKeyValueColumn(table, index)) {
    +                    keyValueIndexes.add(index);
    +                } else {
    +                    rowKeyIndexes.add(index);
    +                }
    +            }
    +        }
    +    }
    +    private class MetaDataAwareHTable extends DelegateHTableInterface {
    +        private final TableRef tableRef;
    +        
    +        private MetaDataAwareHTable(HTableInterface delegate, TableRef tableRef) {
    +            super(delegate);
    +            this.tableRef = tableRef;
    +        }
    +        
    +        /**
    +         * Called by Tephra when a transaction is aborted. We have this wrapper so that
we get an
    +         * opportunity to attach our index meta data to the mutations such that we can
also undo
    +         * the index mutations.
    +         */
    +        @Override
    +        public void delete(List<Delete> deletes) throws IOException {
    +            try {
    +                PTable table = tableRef.getTable();
    +                List<PTable> indexes = table.getIndexes();
    +                Iterator<PTable> enabledIndexes = IndexMaintainer.nonDisabledIndexIterator(indexes.iterator());
    +                if (enabledIndexes.hasNext()) {
    +                    List<PTable> keyValueIndexes = Collections.emptyList();
    +                    ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
    +                    boolean attachMetaData = table.getIndexMaintainers(indexMetaDataPtr,
connection);
    +                    if (table.isImmutableRows()) {
    +                        List<PTable> rowKeyIndexes = Lists.newArrayListWithExpectedSize(indexes.size());
    +                        keyValueIndexes = Lists.newArrayListWithExpectedSize(indexes.size());
    +                        divideImmutableIndexes(enabledIndexes, table, rowKeyIndexes,
keyValueIndexes);
    +                        // Generate index deletes for immutable indexes that only reference
row key
    +                        // columns and submit directly here.
    +                        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
    +                        for (PTable index : rowKeyIndexes) {
    +                            List<Delete> indexDeletes = IndexUtil.generateDeleteIndexData(table,
index, deletes, ptr, connection.getKeyValueBuilder(), connection);
    +                            HTableInterface hindex = connection.getQueryServices().getTable(index.getPhysicalName().getBytes());
    +                            hindex.delete(indexDeletes);
    +                        }
    +                    }
    +                    
    +                    // If we have mutable indexes, local immutable indexes, or global
immutable indexes
    +                    // that reference key value columns, setup index meta data and attach
here. In this
    +                    // case updates to the indexes will be generated on the server side.
    +                    // An alternative would be to let Tephra track the row keys for the
immutable index
    +                    // by adding it as a transaction participant (soon we can prevent
any conflict
    +                    // detection from occurring) with the downside being the additional
memory required.
    +                    if (!keyValueIndexes.isEmpty()) {
    +                        attachMetaData = true;
    +                        IndexMaintainer.serializeAdditional(table, indexMetaDataPtr,
keyValueIndexes, connection);
    +                    }
    +                    if (attachMetaData) {
    +                        setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr);
    +                    }
    +                }
    +                delegate.delete(deletes);
    +            } catch (SQLException e) {
    +                throw new IOException(e);
    +            }
    +        }
    +    }
    +    
         @SuppressWarnings("deprecation")
    -    public void commit() throws SQLException {
    +    private void send(Iterator<TableRef> tableRefIterator) throws SQLException
{
             int i = 0;
    -        PName tenantId = connection.getTenantId();
    -        long[] serverTimeStamps = validate();
    -        Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>>
iterator = this.mutations.entrySet().iterator();
    +        long[] serverTimeStamps = null;
    +        boolean sendAll = false;
    +        // Validate up front if not transactional so that we 
    +        if (tableRefIterator == null) {
    +            serverTimeStamps = validateAll();
    +            tableRefIterator = mutations.keySet().iterator();
    +            sendAll = true;
    +        }
    +
             // add tracing for this operation
             try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations
to tables")) {
                 Span span = trace.getSpan();
    -            while (iterator.hasNext()) {
    -                Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>
entry = iterator.next();
    -                // at this point we are going through mutations for each table
    -
    -                Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue();
    -                // above is mutations for a table where the first part is the row key
and the second part is column values.
    -
    -                TableRef tableRef = entry.getKey();
    -                PTable table = tableRef.getTable();
    -                table.getIndexMaintainers(tempPtr, connection);
    -                boolean hasIndexMaintainers = tempPtr.getLength() > 0;
    -                boolean isDataTable = true;
    -                long serverTimestamp = serverTimeStamps[i++];
    -                Iterator<Pair<byte[],List<Mutation>>> mutationsIterator
= addRowMutations(tableRef, valuesMap, serverTimestamp, false);
    -                // above returns an iterator of pair where the first  
    -                while (mutationsIterator.hasNext()) {
    -                    Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
    -                    byte[] htableName = pair.getFirst();
    -                    List<Mutation> mutations = pair.getSecond();
    -
    -                    //create a span per target table
    -                    //TODO maybe we can be smarter about the table name to string here?
    -                    Span child = Tracing.child(span,"Writing mutation batch for table:
"+Bytes.toString(htableName));
    -
    -                    int retryCount = 0;
    -                    boolean shouldRetry = false;
    -                    do {
    -                        ServerCache cache = null;
    -                        if (hasIndexMaintainers && isDataTable) {
    -                            byte[] attribValue = null;
    -                            byte[] uuidValue;
    -                            if (IndexMetaDataCacheClient.useIndexMetadataCache(connection,
mutations, tempPtr.getLength())) {
    -                                IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection,
tableRef);
    -                                cache = client.addIndexMetadataCache(mutations, tempPtr);
    -                                child.addTimelineAnnotation("Updated index metadata cache");
    -                                uuidValue = cache.getId();
    -                                // If we haven't retried yet, retry for this case only,
as it's possible that
    -                                // a split will occur after we send the index metadata
cache to all known
    -                                // region servers.
    -                                shouldRetry = true;
    -                            } else {
    -                                attribValue = ByteUtil.copyKeyBytesIfNecessary(tempPtr);
    -                                uuidValue = ServerCacheClient.generateId();
    -                            }
    -                            // Either set the UUID to be able to access the index metadata
from the cache
    -                            // or set the index metadata directly on the Mutation
    -                            for (Mutation mutation : mutations) {
    -                                if (tenantId != null) {
    -                                    byte[] tenantIdBytes = ScanUtil.getTenantIdBytes(
    -                                        table.getRowKeySchema(),
    -                                        table.getBucketNum()!=null,
    -                                        tenantId);
    -                                    mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB,
tenantIdBytes);
    -                                }
    -                                mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
    -                                if (attribValue != null) {
    -                                    mutation.setAttribute(PhoenixIndexCodec.INDEX_MD,
attribValue);
    -                                }
    -                            }
    -                        }
    -
    -                        SQLException sqlE = null;
    -                        HTableInterface hTable = connection.getQueryServices().getTable(htableName);
    -                        try {
    -                            long numMutations = mutations.size();
    +	        ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
    +	        while (tableRefIterator.hasNext()) {
    +	        	// at this point we are going through mutations for each table
    +	            TableRef tableRef = tableRefIterator.next();
    +	            Map<ImmutableBytesPtr, RowMutationState> valuesMap = mutations.get(tableRef);
    +	            if (valuesMap == null || valuesMap.isEmpty()) {
    +	                continue;
    +	            }
    +	            PTable table = tableRef.getTable();
    +	            // Track tables to which we've sent uncommitted data
    +	            if (table.isTransactional()) {
    +	                uncommittedPhysicalNames.add(table.getPhysicalName().getString());
    +	            }
    +	            table.getIndexMaintainers(indexMetaDataPtr, connection);
    +	            boolean isDataTable = true;
    +	            // Validate as we go if transactional since we can undo if a problem occurs
(which is unlikely)
    +	            long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap)
: serverTimeStamps[i++];
    +	            Iterator<Pair<byte[],List<Mutation>>> mutationsIterator
= addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
    +	            while (mutationsIterator.hasNext()) {
    +	                Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
    +	                byte[] htableName = pair.getFirst();
    +	                List<Mutation> mutationList = pair.getSecond();
    +	                
    +	                //create a span per target table
    +	                //TODO maybe we can be smarter about the table name to string here?
    +	                Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName));
    +	
    +	                int retryCount = 0;
    +	                boolean shouldRetry = false;
    +	                do {
    +	                    ServerCache cache = null;
    +	                    if (isDataTable) {
    +	                        cache = setMetaDataOnMutations(tableRef, mutationList, indexMetaDataPtr);
    +	                    }
    +	                
    +	                    // If we haven't retried yet, retry for this case only, as it's
possible that
    +	                    // a split will occur after we send the index metadata cache to
all known
    +	                    // region servers.
    +	                    shouldRetry = cache != null;
    +	                    SQLException sqlE = null;
    +	                    HTableInterface hTable = connection.getQueryServices().getTable(htableName);
    +	                    try {
    +	                        if (table.isTransactional()) {
    +	                            // If we have indexes, wrap the HTable in a delegate HTable
that
    +	                            // will attach the necessary index meta data in the event
of a
    +	                            // rollback
    +	                            if (!table.getIndexes().isEmpty()) {
    +	                                hTable = new MetaDataAwareHTable(hTable, tableRef);
    +	                            }
    +	                            TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable,
table);
    +	                            // Don't add immutable indexes (those are the only ones
that would participate
    +	                            // during a commit), as we don't need conflict detection
for these.
    +	                            if (isDataTable) {
    +	                                // Even for immutable, we need to do this so that an
abort has the state
    +	                                // necessary to generate the rows to delete.
    +	                                addTransactionParticipant(txnAware);
    +	                            } else {
    +	                                txnAware.startTx(getTransaction());
    +	                            }
    +	                            hTable = txnAware;
    +	                        }
    +	                        long numMutations = mutationList.size();
                                 GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
                                 
                                 long startTime = System.currentTimeMillis();
    -                            child.addTimelineAnnotation("Attempt " + retryCount);
    -                            hTable.batch(mutations);
    -                            child.stop();
    +                            child.addTimelineAnnotation("Attempt " + retryCount);;
    +	                        hTable.batch(mutationList);
    +	                        child.stop();
    +	                        child.stop();
                                 shouldRetry = false;
                                 long mutationCommitTime = System.currentTimeMillis() - startTime;
                                 GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
                                 
    -                            long mutationSizeBytes = calculateMutationSize(mutations);
    +                            long mutationSizeBytes = calculateMutationSize(mutationList);
                                 MutationMetric mutationsMetric = new MutationMetric(numMutations,
mutationSizeBytes, mutationCommitTime);
                                 mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName),
mutationsMetric);
    -                        } catch (Exception e) {
    -                            SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
    -                            if (inferredE != null) {
    -                                if (shouldRetry && retryCount == 0 &&
inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
    -                                    // Swallow this exception once, as it's possible
that we split after sending the index metadata
    -                                    // and one of the region servers doesn't have it.
This will cause it to have it the next go around.
    -                                    // If it fails again, we don't retry.
    -                                    String msg = "Swallowing exception and retrying after
clearing meta cache on connection. " + inferredE;
    -                                    logger.warn(LogUtil.addCustomAnnotations(msg, connection));
    -                                    connection.getQueryServices().clearTableRegionCache(htableName);
    +	                    } catch (Exception e) {
    +	                        SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
    +	                        if (inferredE != null) {
    +	                            if (shouldRetry && retryCount == 0 && inferredE.getErrorCode()
== SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
    +	                                // Swallow this exception once, as it's possible that
we split after sending the index metadata
    +	                                // and one of the region servers doesn't have it. This
will cause it to have it the next go around.
    +	                                // If it fails again, we don't retry.
    +	                                String msg = "Swallowing exception and retrying after
clearing meta cache on connection. " + inferredE;
    +	                                logger.warn(LogUtil.addCustomAnnotations(msg, connection));
    +	                                connection.getQueryServices().clearTableRegionCache(htableName);
    +	
    +	                                // add a new child span as this one failed
    +	                                child.addTimelineAnnotation(msg);
    +	                                child.stop();
    +	                                child = Tracing.child(span,"Failed batch, attempting
retry");
    +	
    +	                                continue;
    +	                            }
    +	                            e = inferredE;
    +	                        }
    +	                        // Throw to client with both what was committed so far and what
is left to be committed.
    +	                        // That way, client can either undo what was done or try again
with what was not done.
    +	                        sqlE = new CommitException(e, getUncommittedStatementIndexes());
    +	                    } finally {
    +	                        try {
    +	                            if (cache != null) {
    +	                                cache.close();
    +	                            }
    +	                        } finally {
    +	                            try {
    +	                                hTable.close();
    +	                            } 
    +	                            catch (IOException e) {
    +	                                if (sqlE != null) {
    +	                                    sqlE.setNextException(ServerUtil.parseServerException(e));
    +	                                } else {
    +	                                    sqlE = ServerUtil.parseServerException(e);
    +	                                }
    +	                            } 
    +	                            if (sqlE != null) {
    +	                            	// clear pending mutations
    +	                            	mutations.clear();
    +	                                throw sqlE;
    +	                            }
    +	                        }
    +	                    }
    +	                } while (shouldRetry && retryCount++ < 1);
    +	                isDataTable = false;
    +	            }
    +	            if (tableRef.getTable().getType() != PTableType.INDEX) {
    +	                numRows -= valuesMap.size();
    +	            }
    +	            // Remove batches as we process them
    +	            if (sendAll) {
    +	            	tableRefIterator.remove(); // Iterating through actual map in this case
    +	            } else {
    +	            	mutations.remove(tableRef);
    +	            }
    +	        }
    +        }
    +        // Note that we cannot assume that *all* mutations have been sent, since we've
optimized this
    +        // now to only send the mutations for the tables we're querying, hence we've
removed the
    +        // assertions that we're here before.
    +    }
     
    -                                    // add a new child span as this one failed
    -                                    child.addTimelineAnnotation(msg);
    -                                    child.stop();
    -                                    child = Tracing.child(span,"Failed batch, attempting
retry");
    +    public byte[] encodeTransaction() throws SQLException {
    +        try {
    +            return CODEC.encode(getTransaction());
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +    
    +    public static Transaction decodeTransaction(byte[] txnBytes) throws IOException {
    +    	return (txnBytes == null || txnBytes.length==0) ? null : CODEC.decode(txnBytes);
    +    }
     
    -                                    continue;
    -                                }
    -                                e = inferredE;
    -                            }
    -                            sqlE = new CommitException(e, getUncommittedStatementIndexes());
    -                        } finally {
    -                            try {
    -                                hTable.close();
    -                            } catch (IOException e) {
    -                                if (sqlE != null) {
    -                                    sqlE.setNextException(ServerUtil.parseServerException(e));
    -                                } else {
    -                                    sqlE = ServerUtil.parseServerException(e);
    -                                }
    -                            } finally {
    -                                try {
    -                                    if (cache != null) {
    -                                        cache.close();
    -                                    }
    -                                } finally {
    -                                    if (sqlE != null) {
    -                                        throw sqlE;
    -                                    }
    -                                }
    -                            }
    -                        }
    -                    } while (shouldRetry && retryCount++ < 1);
    -                    isDataTable = false;
    -                }
    -                if (tableRef.getTable().getType() != PTableType.INDEX) {
    -                    numRows -= entry.getValue().size();
    +    private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation>
mutations,
    +            ImmutableBytesWritable indexMetaDataPtr) throws SQLException {
    +        PTable table = tableRef.getTable();
    +        byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
    +        ServerCache cache = null;
    +        byte[] attribValue = null;
    +        byte[] uuidValue = null;
    +        byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
    +        if (table.isTransactional()) {
    +            txState = encodeTransaction();
    +        }
    +        boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
    +        if (hasIndexMetaData) {
    +            if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations,
indexMetaDataPtr.getLength() + txState.length)) {
    +                IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection,
tableRef);
    +                cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState);
    +                uuidValue = cache.getId();
    +            } else {
    +                attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
    +                uuidValue = ServerCacheClient.generateId();
    +            }
    +        } else if (txState.length == 0) {
    +            return null;
    +        }
    +        // Either set the UUID to be able to access the index metadata from the cache
    +        // or set the index metadata directly on the Mutation
    +        for (Mutation mutation : mutations) {
    +            if (tenantId != null) {
    +                mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
    +            }
    +            mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
    +            if (attribValue != null) {
    +                mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
    +                if (txState.length > 0) {
    +                    mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
                     }
    -                iterator.remove(); // Remove batches as we process them
    +            } else if (!hasIndexMetaData && txState.length > 0) {
    +                mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
                 }
             }
    -        assert(numRows==0);
    -        assert(this.mutations.isEmpty());
    +        return cache;
         }
         
    -    public void rollback(PhoenixConnection connection) throws SQLException {
    +    public void clear() throws SQLException {
    --- End diff --
    
    Make this clear() method private if possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message