From commits-return-21615-archive-asf-public=cust-asf.ponee.io@phoenix.apache.org Fri Jun 1 19:31:55 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 626EF180676 for ; Fri, 1 Jun 2018 19:31:53 +0200 (CEST) Received: (qmail 7882 invoked by uid 500); 1 Jun 2018 17:31:52 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 7873 invoked by uid 99); 1 Jun 2018 17:31:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jun 2018 17:31:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BCF41DFF7E; Fri, 1 Jun 2018 17:31:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jamestaylor@apache.org To: commits@phoenix.apache.org Date: Fri, 01 Jun 2018 17:31:52 -0000 Message-Id: <7fd28243f2b74fb883314248400c9c5c@git.apache.org> In-Reply-To: <27632baae0b643e6bb5e2a5fa6ca49ac@git.apache.org> References: <27632baae0b643e6bb5e2a5fa6ca49ac@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] phoenix git commit: PHOENIX-4762 Performance regression with transactional immutable indexes PHOENIX-4762 Performance regression with transactional immutable indexes Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/edb6dcf5 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/edb6dcf5 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/edb6dcf5 Branch: refs/heads/4.x-cdh5.11 Commit: edb6dcf5f976d91971941166746191a1b49be211 Parents: 5cc7575 Author: James Taylor Authored: Fri Jun 1 09:03:21 2018 -0700 Committer: James Taylor Committed: Fri Jun 1 10:30:40 2018 -0700 ---------------------------------------------------------------------- .../apache/phoenix/execute/MutationState.java | 701 ++++++++++--------- .../PhoenixTxIndexMutationGenerator.java | 2 +- .../transaction/OmidTransactionContext.java | 6 + .../transaction/PhoenixTransactionContext.java | 6 + .../transaction/TephraTransactionContext.java | 16 + .../java/org/apache/phoenix/util/IndexUtil.java | 28 - 6 files changed, 385 insertions(+), 374 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/edb6dcf5/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index b9a6c9e..2e795b1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -38,10 +38,9 @@ import javax.annotation.Nonnull; import javax.annotation.concurrent.Immutable; import org.apache.hadoop.hbase.HConstants; -import org.apache.phoenix.util.KeyValueUtil; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -83,7 +82,6 @@ import org.apache.phoenix.schema.PMetaData; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableRef; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; @@ -98,6 +96,7 @@ import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.transaction.TransactionFactory.Provider; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SQLCloseable; @@ -116,9 +115,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; /** - * * Tracks the uncommitted state - * */ public class MutationState implements SQLCloseable { private static final Logger logger = LoggerFactory.getLogger(MutationState.class); @@ -150,30 +147,34 @@ public class MutationState implements SQLCloseable { this(maxSize, maxSizeBytes, connection, false, null); } - public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, PhoenixTransactionContext txContext) { + public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, + PhoenixTransactionContext txContext) { this(maxSize, maxSizeBytes, connection, false, txContext); } public MutationState(MutationState mutationState) { - this(mutationState.maxSize, mutationState.maxSizeBytes, mutationState.connection, true, mutationState.getPhoenixTransactionContext()); + this(mutationState.maxSize, mutationState.maxSizeBytes, mutationState.connection, true, mutationState + .getPhoenixTransactionContext()); } public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, long sizeOffset) { this(maxSize, maxSizeBytes, connection, false, null, sizeOffset); } - private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext) { + private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, + PhoenixTransactionContext txContext) { this(maxSize, maxSizeBytes, connection, subTask, txContext, 0); } - private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext, long sizeOffset) { - this(maxSize, maxSizeBytes, connection, Maps.newHashMapWithExpectedSize(5), subTask, txContext); + private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, + PhoenixTransactionContext txContext, long sizeOffset) { + this(maxSize, maxSizeBytes, connection, Maps. newHashMapWithExpectedSize(5), + subTask, txContext); this.sizeOffset = sizeOffset; } MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, - Map mutations, - boolean subTask, PhoenixTransactionContext txContext) { + Map mutations, boolean subTask, PhoenixTransactionContext txContext) { this.maxSize = maxSize; this.maxSizeBytes = maxSizeBytes; this.connection = connection; @@ -193,7 +194,8 @@ public class MutationState implements SQLCloseable { } } - public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException { + public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, long maxSize, + long maxSizeBytes, PhoenixConnection connection) throws SQLException { this(maxSize, maxSizeBytes, connection, false, null, sizeOffset); if (!mutations.isEmpty()) { this.mutations.put(table, mutations); @@ -202,7 +204,7 @@ public class MutationState implements SQLCloseable { this.estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(this.mutations); throwIfTooBig(); } - + public long getEstimatedSize() { return estimatedSize; } @@ -220,13 +222,12 @@ public class MutationState implements SQLCloseable { } /** - * Commit a write fence when creating an index so that we can detect - * when a data table transaction is started before the create index - * but completes after it. In this case, we need to rerun the data - * table transaction after the index creation so that the index rows - * are generated. See TEPHRA-157 - * for more information. - * @param dataTable the data table upon which an index is being added + * Commit a write fence when creating an index so that we can detect when a data table transaction is started before + * the create index but completes after it. In this case, we need to rerun the data table transaction after the + * index creation so that the index rows are generated. See TEPHRA-157 for more information. + * + * @param dataTable + * the data table upon which an index is being added * @throws SQLException */ public void commitDDLFence(PTable dataTable) throws SQLException { @@ -242,19 +243,17 @@ public class MutationState implements SQLCloseable { } } } - + public boolean checkpointIfNeccessary(MutationPlan plan) throws SQLException { - if (! phoenixTransactionContext.isTransactionRunning() || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) { - return false; - } + if (!phoenixTransactionContext.isTransactionRunning() || plan.getTargetRef() == null + || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) { return false; } Set sources = plan.getSourceRefs(); - if (sources.isEmpty()) { - return false; - } + if (sources.isEmpty()) { return false; } // For a DELETE statement, we're always querying the table being deleted from. This isn't // a problem, but it potentially could be if there are other references to the same table // nested in the DELETE statement (as a sub query or join, for example). - TableRef ignoreForExcludeCurrent = plan.getOperation() == Operation.DELETE && sources.size() == 1 ? plan.getTargetRef() : null; + TableRef ignoreForExcludeCurrent = plan.getOperation() == Operation.DELETE && sources.size() == 1 ? plan + .getTargetRef() : null; boolean excludeCurrent = false; String targetPhysicalName = plan.getTargetRef().getTable().getPhysicalName().getString(); for (TableRef source : sources) { @@ -279,7 +278,8 @@ public class MutationState implements SQLCloseable { // external transaction context, it's possible that data was already written at the // current transaction timestamp, so we always checkpoint in that case is we're // reading and writing to the same table. - if (source.getTable().isTransactional() && (isExternalTxContext || uncommittedPhysicalNames.contains(sourcePhysicalName))) { + if (source.getTable().isTransactional() + && (isExternalTxContext || uncommittedPhysicalNames.contains(sourcePhysicalName))) { hasUncommittedData = true; break; } @@ -331,30 +331,20 @@ public class MutationState implements SQLCloseable { } public boolean startTransaction(Provider provider) throws SQLException { - if (provider == null) { - return false; - } - if (!connection.getQueryServices().getProps().getBoolean( - QueryServices.TRANSACTIONS_ENABLED, - QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) { - throw new SQLExceptionInfo.Builder( - SQLExceptionCode.CANNOT_START_TXN_IF_TXN_DISABLED) - .build().buildException(); - } - if (connection.getSCN() != null) { - throw new SQLExceptionInfo.Builder( - SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET) - .build().buildException(); - } + if (provider == null) { return false; } + if (!connection.getQueryServices().getProps() + .getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) { throw new SQLExceptionInfo.Builder( + SQLExceptionCode.CANNOT_START_TXN_IF_TXN_DISABLED).build().buildException(); } + if (connection.getSCN() != null) { throw new SQLExceptionInfo.Builder( + SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET).build().buildException(); } if (phoenixTransactionContext == PhoenixTransactionContext.NULL_CONTEXT) { phoenixTransactionContext = provider.getTransactionProvider().getTransactionContext(connection); } else { - if (provider != phoenixTransactionContext.getProvider()) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_MIX_TXN_PROVIDERS) - .setMessage(phoenixTransactionContext.getProvider().name() + " and " + provider.name()) - .build().buildException(); - } + if (provider != phoenixTransactionContext.getProvider()) { throw new SQLExceptionInfo.Builder( + SQLExceptionCode.CANNOT_MIX_TXN_PROVIDERS) + .setMessage(phoenixTransactionContext.getProvider().name() + " and " + provider.name()).build() + .buildException(); } } if (!isTransactionStarted()) { // Clear any transactional state in case transaction was ended outside @@ -371,28 +361,28 @@ public class MutationState implements SQLCloseable { } public static MutationState emptyMutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) { - MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.emptyMap(), false, null); + MutationState state = new MutationState(maxSize, maxSizeBytes, connection, + Collections. emptyMap(), false, null); state.sizeOffset = 0; return state; } - + private void throwIfTooBig() throws SQLException { if (numRows > maxSize) { resetState(); - throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED).build() - .buildException(); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED).build().buildException(); } if (estimatedSize > maxSizeBytes) { resetState(); - throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED) - .build().buildException(); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED).build() + .buildException(); } } - + public long getUpdateCount() { return sizeOffset + numRows; } - + private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, Map dstMutations) { PTable table = tableRef.getTable(); @@ -401,21 +391,22 @@ public class MutationState implements SQLCloseable { MultiRowMutationState existingRows = dstMutations.put(tableRef, srcRows); if (existingRows != null) { // Rows for that table already exist // Loop through new rows and replace existing with new - for (Map.Entry rowEntry : srcRows.entrySet()) { + for (Map.Entry rowEntry : srcRows.entrySet()) { // Replace existing row with new row RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue()); if (existingRowMutationState != null) { - Map existingValues = existingRowMutationState.getColumnValues(); + Map existingValues = existingRowMutationState.getColumnValues(); if (existingValues != PRow.DELETE_MARKER) { - Map newRow = rowEntry.getValue().getColumnValues(); - // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. + Map newRow = rowEntry.getValue().getColumnValues(); + // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with + // existing row. if (newRow != PRow.DELETE_MARKER) { // decrement estimated size by the size of the old row - estimatedSize-=existingRowMutationState.calculateEstimatedSize(); + estimatedSize -= existingRowMutationState.calculateEstimatedSize(); // Merge existing column values with new column values existingRowMutationState.join(rowEntry.getValue()); // increment estimated size by the size of the new row - estimatedSize+=existingRowMutationState.calculateEstimatedSize(); + estimatedSize += existingRowMutationState.calculateEstimatedSize(); // Now that the existing row has been merged with the new row, replace it back // again (since it was merged with the new one above). existingRows.put(rowEntry.getKey(), existingRowMutationState); @@ -440,12 +431,12 @@ public class MutationState implements SQLCloseable { numRows += srcRows.size(); // if we added all the rows from newMutationState we can just increment the // estimatedSize by newMutationState.estimatedSize - estimatedSize += srcRows.estimatedSize; + estimatedSize += srcRows.estimatedSize; } } } - - private void joinMutationState(Map srcMutations, + + private void joinMutationState(Map srcMutations, Map dstMutations) { // Merge newMutation with this one, keeping state from newMutation for any overlaps for (Map.Entry entry : srcMutations.entrySet()) { @@ -455,11 +446,13 @@ public class MutationState implements SQLCloseable { joinMutationState(tableRef, srcRows, dstMutations); } } + /** * Combine a newer mutation with this one, where in the event of overlaps, the newer one will take precedence. * Combine any metrics collected for the newer mutation. * - * @param newMutationState the newer mutation state + * @param newMutationState + * the newer mutation state */ public void join(MutationState newMutationState) throws SQLException { if (this == newMutationState) { // Doesn't make sense @@ -485,17 +478,16 @@ public class MutationState implements SQLCloseable { throwIfTooBig(); } - private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long rowTimestamp, PTable table) { RowKeySchema schema = table.getRowKeySchema(); int rowTimestampColPos = table.getRowTimestampColPos(); - Field rowTimestampField = schema.getField(rowTimestampColPos); + Field rowTimestampField = schema.getField(rowTimestampColPos); byte[] rowTimestampBytes = PLong.INSTANCE.toBytes(rowTimestamp, rowTimestampField.getSortOrder()); int oldOffset = ptr.getOffset(); int oldLength = ptr.getLength(); // Move the pointer to the start byte of the row timestamp pk schema.position(ptr, 0, rowTimestampColPos); - byte[] b = ptr.get(); + byte[] b = ptr.get(); int newOffset = ptr.getOffset(); int length = ptr.getLength(); for (int i = newOffset; i < newOffset + length; i++) { @@ -506,24 +498,29 @@ public class MutationState implements SQLCloseable { ptr.set(ptr.get(), oldOffset, oldLength); return ptr; } - - private Iterator>> addRowMutations(final TableRef tableRef, final MultiRowMutationState values, - final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) { + + private static List getClientMaintainedIndexes(PTable table) { + Iterator indexIterator = // Only maintain tables with immutable rows through this client-side mechanism + (table.isImmutableRows() || table.isTransactional()) ? IndexMaintainer.maintainedGlobalIndexes(table + .getIndexes().iterator()) : Collections. emptyIterator(); + return Lists.newArrayList(indexIterator); + } + + private Iterator>> addRowMutations(final TableRef tableRef, + final MultiRowMutationState values, final long mutationTimestamp, final long serverTimestamp, + boolean includeAllIndexes, final boolean sendAll) { final PTable table = tableRef.getTable(); - final Iterator indexIterator = // Only maintain tables with immutable rows through this client-side mechanism - includeAllIndexes ? - IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()) : - (table.isImmutableRows() || table.isTransactional()) ? - IndexMaintainer.maintainedGlobalIndexes(table.getIndexes().iterator()) : - Collections.emptyIterator(); - final List indexList = Lists.newArrayList(indexIterator); + final List indexList = includeAllIndexes ? Lists.newArrayList(IndexMaintainer.maintainedIndexes(table + .getIndexes().iterator())) : getClientMaintainedIndexes(table); final Iterator indexes = indexList.iterator(); final List mutationList = Lists.newArrayListWithExpectedSize(values.size()); - final List mutationsPertainingToIndex = indexes.hasNext() ? Lists.newArrayListWithExpectedSize(values.size()) : null; - generateMutations(tableRef, mutationTimestamp, serverTimestamp, values, mutationList, mutationsPertainingToIndex); - return new Iterator>>() { + final List mutationsPertainingToIndex = indexes.hasNext() ? Lists + . newArrayListWithExpectedSize(values.size()) : null; + generateMutations(tableRef, mutationTimestamp, serverTimestamp, values, mutationList, + mutationsPertainingToIndex); + return new Iterator>>() { boolean isFirst = true; - Map> indexMutationsMap = null; + Map> indexMutationsMap = null; @Override public boolean hasNext() { @@ -534,19 +531,22 @@ public class MutationState implements SQLCloseable { public Pair> next() { if (isFirst) { isFirst = false; - return new Pair>(table.getPhysicalName(), mutationList); + return new Pair>(table.getPhysicalName(), mutationList); } PTable index = indexes.next(); - + List indexMutations = null; try { if (!mutationsPertainingToIndex.isEmpty()) { if (table.isTransactional()) { if (indexMutationsMap == null) { - PhoenixTxIndexMutationGenerator generator = newTxIndexMutationGenerator(table, indexList, mutationsPertainingToIndex.get(0).getAttributesMap()); - try (HTableInterface htable = connection.getQueryServices().getTable(table.getPhysicalName().getBytes())) { - Collection> allMutations = generator.getIndexUpdates(htable, mutationsPertainingToIndex.iterator()); + PhoenixTxIndexMutationGenerator generator = newTxIndexMutationGenerator(table, + indexList, mutationsPertainingToIndex.get(0).getAttributesMap()); + try (HTableInterface htable = connection.getQueryServices().getTable( + table.getPhysicalName().getBytes())) { + Collection> allMutations = generator.getIndexUpdates(htable, + mutationsPertainingToIndex.iterator()); indexMutationsMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); for (Pair mutation : allMutations) { List mutations = indexMutationsMap.get(mutation.getSecond()); @@ -559,20 +559,21 @@ public class MutationState implements SQLCloseable { } } indexMutations = indexMutationsMap.get(index.getPhysicalName().getBytes()); - } else { - indexMutations = - IndexUtil.generateIndexData(table, index, values, mutationsPertainingToIndex, - connection.getKeyValueBuilder(), connection); + } else { + indexMutations = IndexUtil.generateIndexData(table, index, values, + mutationsPertainingToIndex, connection.getKeyValueBuilder(), connection); } } - // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map + // we may also have to include delete mutations for immutable tables if we are not processing all + // the tables in the mutations map if (!sendAll) { TableRef key = new TableRef(index); MultiRowMutationState multiRowMutationState = mutations.remove(key); - if (multiRowMutationState!=null) { + if (multiRowMutationState != null) { final List deleteMutations = Lists.newArrayList(); - generateMutations(tableRef, mutationTimestamp, serverTimestamp, multiRowMutationState, deleteMutations, null); + generateMutations(tableRef, mutationTimestamp, serverTimestamp, multiRowMutationState, + deleteMutations, null); if (indexMutations == null) { indexMutations = deleteMutations; } else { @@ -583,18 +584,20 @@ public class MutationState implements SQLCloseable { } catch (SQLException | IOException e) { throw new IllegalDataException(e); } - return new Pair>(index.getPhysicalName(),indexMutations == null ? Collections.emptyList() : indexMutations); + return new Pair>(index.getPhysicalName(), + indexMutations == null ? Collections. emptyList() : indexMutations); } @Override public void remove() { throw new UnsupportedOperationException(); } - + }; } - private PhoenixTxIndexMutationGenerator newTxIndexMutationGenerator(PTable table, List indexes, Map attributes) { + private PhoenixTxIndexMutationGenerator newTxIndexMutationGenerator(PTable table, List indexes, + Map attributes) { final List indexMaintainers = Lists.newArrayListWithExpectedSize(indexes.size()); for (PTable index : indexes) { IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); @@ -603,8 +606,7 @@ public class MutationState implements SQLCloseable { IndexMetaDataCache indexMetaDataCache = new IndexMetaDataCache() { @Override - public void close() throws IOException { - } + public void close() throws IOException {} @Override public List getIndexMaintainers() { @@ -613,30 +615,30 @@ public class MutationState implements SQLCloseable { @Override public PhoenixTransactionContext getTransactionContext() { - return phoenixTransactionContext; + return phoenixTransactionContext.newTransactionContext(phoenixTransactionContext, true); } @Override public int getClientVersion() { return MetaDataProtocol.PHOENIX_VERSION; } - + }; try { PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(indexMetaDataCache, attributes); - return new PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(), indexMetaData, table.getPhysicalName().getBytes()); + return new PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(), indexMetaData, + table.getPhysicalName().getBytes()); } catch (IOException e) { throw new RuntimeException(e); // Impossible } } - - private void generateMutations(final TableRef tableRef, final long mutationTimestamp, - final long serverTimestamp, final MultiRowMutationState values, - final List mutationList, final List mutationsPertainingToIndex) { + + private void generateMutations(final TableRef tableRef, final long mutationTimestamp, final long serverTimestamp, + final MultiRowMutationState values, final List mutationList, + final List mutationsPertainingToIndex) { final PTable table = tableRef.getTable(); boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1; - Iterator> iterator = - values.entrySet().iterator(); + Iterator> iterator = values.entrySet().iterator(); long timestampToUse = mutationTimestamp; MultiRowMutationState modifiedValues = new MultiRowMutationState(16); while (iterator.hasNext()) { @@ -650,11 +652,11 @@ public class MutationState implements SQLCloseable { if (rowTsColInfo.useServerTimestamp()) { // regenerate the key with this timestamp. key = getNewRowKeyWithRowTimestamp(key, serverTimestamp, table); - // since we are about to modify the byte[] stored in key (which changes its hashcode) - // we need to remove the entry from the values map and add a new entry with the modified byte[] - modifiedValues.put(key, state); - iterator.remove(); - timestampToUse = serverTimestamp; + // since we are about to modify the byte[] stored in key (which changes its hashcode) + // we need to remove the entry from the values map and add a new entry with the modified byte[] + modifiedValues.put(key, state); + iterator.remove(); + timestampToUse = serverTimestamp; } else { if (rowTsColInfo.getTimestamp() != null) { timestampToUse = rowTsColInfo.getTimestamp(); @@ -669,15 +671,14 @@ public class MutationState implements SQLCloseable { // The DeleteCompiler already generates the deletes for indexes, so no need to do it again rowMutationsPertainingToIndex = Collections.emptyList(); } else { - for (Map.Entry valueEntry : rowEntry.getValue().getColumnValues() - .entrySet()) { + for (Map.Entry valueEntry : rowEntry.getValue().getColumnValues().entrySet()) { row.setValue(valueEntry.getKey(), valueEntry.getValue()); } rowMutations = row.toRowMutations(); // Pass through ON DUPLICATE KEY info through mutations // In the case of the same clause being used on many statements, this will be // inefficient because we're transmitting the same information for each mutation. - // TODO: use our ServerCache + // TODO: use our ServerCache for (Mutation mutation : rowMutations) { if (onDupKeyBytes != null) { mutation.setAttribute(PhoenixIndexBuilder.ATOMIC_OP_ATTRIB, onDupKeyBytes); @@ -686,61 +687,61 @@ public class MutationState implements SQLCloseable { rowMutationsPertainingToIndex = rowMutations; } mutationList.addAll(rowMutations); - if (mutationsPertainingToIndex != null) mutationsPertainingToIndex - .addAll(rowMutationsPertainingToIndex); + if (mutationsPertainingToIndex != null) mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex); } values.putAll(modifiedValues); } - + /** * Get the unsorted list of HBase mutations for the tables with uncommitted data. + * * @return list of HBase mutations for uncommitted data. */ - public Iterator>> toMutations(Long timestamp) { + public Iterator>> toMutations(Long timestamp) { return toMutations(false, timestamp); } - - public Iterator>> toMutations() { + + public Iterator>> toMutations() { return toMutations(false, null); } - - public Iterator>> toMutations(final boolean includeMutableIndexes) { + + public Iterator>> toMutations(final boolean includeMutableIndexes) { return toMutations(includeMutableIndexes, null); } - - public Iterator>> toMutations(final boolean includeMutableIndexes, final Long tableTimestamp) { + + public Iterator>> toMutations(final boolean includeMutableIndexes, + final Long tableTimestamp) { final Iterator> iterator = this.mutations.entrySet().iterator(); - if (!iterator.hasNext()) { - return Collections.emptyIterator(); - } + if (!iterator.hasNext()) { return Collections.emptyIterator(); } Long scn = connection.getSCN(); final long serverTimestamp = getTableTimestamp(tableTimestamp, scn); final long mutationTimestamp = getMutationTimestamp(scn); - return new Iterator>>() { + return new Iterator>>() { private Map.Entry current = iterator.next(); - private Iterator>> innerIterator = init(); - - private Iterator>> init() { - final Iterator>> mutationIterator = addRowMutations(current.getKey(), current.getValue(), mutationTimestamp, serverTimestamp, includeMutableIndexes, true); - return new Iterator>>() { + private Iterator>> innerIterator = init(); + + private Iterator>> init() { + final Iterator>> mutationIterator = addRowMutations(current.getKey(), + current.getValue(), mutationTimestamp, serverTimestamp, includeMutableIndexes, true); + return new Iterator>>() { @Override public boolean hasNext() { return mutationIterator.hasNext(); } @Override - public Pair> next() { + public Pair> next() { Pair> pair = mutationIterator.next(); return new Pair>(pair.getFirst().getBytes(), pair.getSecond()); } - + @Override public void remove() { mutationIterator.remove(); } }; } - + @Override public boolean hasNext() { return innerIterator.hasNext() || iterator.hasNext(); @@ -750,7 +751,7 @@ public class MutationState implements SQLCloseable { public Pair> next() { if (!innerIterator.hasNext()) { current = iterator.next(); - innerIterator=init(); + innerIterator = init(); } return innerIterator.next(); } @@ -759,24 +760,26 @@ public class MutationState implements SQLCloseable { public void remove() { throw new UnsupportedOperationException(); } - + }; } public static long getTableTimestamp(final Long tableTimestamp, Long scn) { - return (tableTimestamp!=null && tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null ? HConstants.LATEST_TIMESTAMP : scn); + return (tableTimestamp != null && tableTimestamp != QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp + : (scn == null ? HConstants.LATEST_TIMESTAMP : scn); } - + public static long getMutationTimestamp(final Long scn) { return scn == null ? HConstants.LATEST_TIMESTAMP : scn; } /** - * Validates that the meta data is valid against the server meta data if we haven't yet done so. - * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta data - * has changed. + * Validates that the meta data is valid against the server meta data if we haven't yet done so. Otherwise, for + * every UPSERT VALUES call, we'd need to hit the server to see if the meta data has changed. + * * @return the server time to use for the upsert - * @throws SQLException if the table or any columns no longer exist + * @throws SQLException + * if the table or any columns no longer exist */ private long[] validateAll() throws SQLException { int i = 0; @@ -787,19 +790,20 @@ public class MutationState implements SQLCloseable { } return timeStamps; } - - private long validateAndGetServerTimestamp(TableRef tableRef, MultiRowMutationState rowKeyToColumnMap) throws SQLException { + + private long validateAndGetServerTimestamp(TableRef tableRef, MultiRowMutationState rowKeyToColumnMap) + throws SQLException { Long scn = connection.getSCN(); MetaDataClient client = new MetaDataClient(connection); long serverTimeStamp = tableRef.getTimeStamp(); // If we're auto committing, we've already validated the schema when we got the ColumnResolver, // so no need to do it again here. PTable table = tableRef.getTable(); - MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString()); + MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName() + .getString()); PTable resolvedTable = result.getTable(); - if (resolvedTable == null) { - throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString()); - } + if (resolvedTable == null) { throw new TableNotFoundException(table.getSchemaName().getString(), table + .getTableName().getString()); } // Always update tableRef table as the one we've cached may be out of date since when we executed // the UPSERT VALUES call and updated in the cache before this. tableRef.setTable(resolvedTable); @@ -807,39 +811,38 @@ public class MutationState implements SQLCloseable { for (PTable idxTtable : indexes) { // If index is still active, but has a non zero INDEX_DISABLE_TIMESTAMP value, then infer that // our failure mode is block writes on index failure. - if ((idxTtable.getIndexState() == PIndexState.ACTIVE || idxTtable.getIndexState() == PIndexState.PENDING_ACTIVE) && idxTtable.getIndexDisableTimestamp() > 0) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE) - .setSchemaName(table.getSchemaName().getString()) - .setTableName(table.getTableName().getString()).build().buildException(); - } - } + if ((idxTtable.getIndexState() == PIndexState.ACTIVE || idxTtable.getIndexState() == PIndexState.PENDING_ACTIVE) + && idxTtable.getIndexDisableTimestamp() > 0) { throw new SQLExceptionInfo.Builder( + SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE).setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()).build().buildException(); } + } long timestamp = result.getMutationTime(); if (timestamp != QueryConstants.UNSET_TIMESTAMP) { serverTimeStamp = timestamp; if (result.wasUpdated()) { List columns = Lists.newArrayListWithExpectedSize(table.getColumns().size()); - for (Map.Entry rowEntry : rowKeyToColumnMap.entrySet()) { + for (Map.Entry rowEntry : rowKeyToColumnMap.entrySet()) { RowMutationState valueEntry = rowEntry.getValue(); if (valueEntry != null) { Map colValues = valueEntry.getColumnValues(); if (colValues != PRow.DELETE_MARKER) { for (PColumn column : colValues.keySet()) { - if (!column.isDynamic()) - columns.add(column); + if (!column.isDynamic()) columns.add(column); } } } } for (PColumn column : columns) { if (column != null) { - resolvedTable.getColumnFamily(column.getFamilyName().getString()).getPColumnForColumnName(column.getName().getString()); + resolvedTable.getColumnFamily(column.getFamilyName().getString()).getPColumnForColumnName( + column.getName().getString()); } } } } return serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp; } - + private static long calculateMutationSize(List mutations) { long byteSize = 0; if (GlobalClientMetrics.isMetricsEnabled()) { @@ -850,24 +853,6 @@ public class MutationState implements SQLCloseable { GLOBAL_MUTATION_BYTES.update(byteSize); return byteSize; } - - private boolean hasKeyValueColumn(PTable table, PTable index) { - IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); - return !maintainer.getAllColumns().isEmpty(); - } - - private void divideImmutableIndexes(Iterator enabledImmutableIndexes, PTable table, List rowKeyIndexes, List keyValueIndexes) { - while (enabledImmutableIndexes.hasNext()) { - PTable index = enabledImmutableIndexes.next(); - if (index.getIndexType() != IndexType.LOCAL) { - if (hasKeyValueColumn(table, index)) { - keyValueIndexes.add(index); - } else { - rowKeyIndexes.add(index); - } - } - } - } public long getBatchSizeBytes() { return batchSizeBytes; @@ -879,56 +864,52 @@ public class MutationState implements SQLCloseable { private class MetaDataAwareHTable extends DelegateHTable { private final TableRef tableRef; - + private MetaDataAwareHTable(HTableInterface delegate, TableRef tableRef) { super(delegate); this.tableRef = tableRef; } - + /** - * Called by Tephra when a transaction is aborted. We have this wrapper so that we get an - * opportunity to attach our index meta data to the mutations such that we can also undo - * the index mutations. + * Called by Tephra when a transaction is aborted. We have this wrapper so that we get an opportunity to attach + * our index meta data to the mutations such that we can also undo the index mutations. */ @Override public void delete(List deletes) throws IOException { ServerCache cache = null; try { + if (deletes.isEmpty()) { return; } + // Attach meta data for server maintained indexes PTable table = tableRef.getTable(); - List indexes = table.getIndexes(); - Iterator enabledIndexes = IndexMaintainer.maintainedIndexes(indexes.iterator()); - if (enabledIndexes.hasNext()) { - List keyValueIndexes = Collections.emptyList(); - ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); - boolean attachMetaData = table.getIndexMaintainers(indexMetaDataPtr, connection); - if (table.isImmutableRows()) { - List rowKeyIndexes = Lists.newArrayListWithExpectedSize(indexes.size()); - keyValueIndexes = Lists.newArrayListWithExpectedSize(indexes.size()); - divideImmutableIndexes(enabledIndexes, table, rowKeyIndexes, keyValueIndexes); - // Generate index deletes for immutable indexes that only reference row key - // columns and submit directly here. - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - for (PTable index : rowKeyIndexes) { - List indexDeletes = IndexUtil.generateDeleteIndexData(table, index, deletes, ptr, connection.getKeyValueBuilder(), connection); - HTableInterface hindex = connection.getQueryServices().getTable(index.getPhysicalName().getBytes()); - hindex.delete(indexDeletes); + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); + if (table.getIndexMaintainers(indexMetaDataPtr, connection)) { + cache = setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr); + } + + // Send deletes for client maintained indexes + List indexes = getClientMaintainedIndexes(table); + if (!indexes.isEmpty()) { + PhoenixTxIndexMutationGenerator generator = newTxIndexMutationGenerator(table, indexes, deletes.get(0) + .getAttributesMap()); + Collection> indexUpdates = generator.getIndexUpdates(delegate, + deletes.iterator()); + for (PTable index : indexes) { + byte[] physicalName = index.getPhysicalName().getBytes(); + try (HTableInterface hindex = connection.getQueryServices().getTable(physicalName)) { + List indexDeletes = Lists.newArrayListWithExpectedSize(deletes.size()); + for (Pair mutationPair : indexUpdates) { + if (Bytes.equals(mutationPair.getSecond(), physicalName)) { + indexDeletes.add(mutationPair.getFirst()); + } + hindex.batch(indexDeletes); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); } } - - // If we have mutable indexes, local immutable indexes, or global immutable indexes - // that reference key value columns, setup index meta data and attach here. In this - // case updates to the indexes will be generated on the server side. - // An alternative would be to let Tephra track the row keys for the immutable index - // by adding it as a transaction participant (soon we can prevent any conflict - // detection from occurring) with the downside being the additional memory required. - if (!keyValueIndexes.isEmpty()) { - attachMetaData = true; - IndexMaintainer.serializeAdditional(table, indexMetaDataPtr, keyValueIndexes, connection); - } - if (attachMetaData) { - cache = setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr); - } } + delegate.delete(deletes); } catch (SQLException e) { throw new IOException(e); @@ -939,13 +920,15 @@ public class MutationState implements SQLCloseable { } } } - + private static class TableInfo { - + private final boolean isDataTable; - @Nonnull private final PName hTableName; - @Nonnull private final TableRef origTableRef; - + @Nonnull + private final PName hTableName; + @Nonnull + private final TableRef origTableRef; + public TableInfo(boolean isDataTable, PName hTableName, TableRef origTableRef) { super(); checkNotNull(hTableName); @@ -981,14 +964,14 @@ public class MutationState implements SQLCloseable { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; - TableInfo other = (TableInfo) obj; + TableInfo other = (TableInfo)obj; if (!hTableName.equals(other.hTableName)) return false; if (isDataTable != other.isDataTable) return false; return true; } } - + @SuppressWarnings("deprecation") private void send(Iterator tableRefIterator) throws SQLException { int i = 0; @@ -1001,7 +984,7 @@ public class MutationState implements SQLCloseable { } MultiRowMutationState multiRowMutationState; - Map> physicalTableMutationMap = Maps.newLinkedHashMap(); + Map> physicalTableMutationMap = Maps.newLinkedHashMap(); // add tracing for this operation try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) { Span span = trace.getSpan(); @@ -1014,21 +997,22 @@ public class MutationState implements SQLCloseable { continue; } // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) - long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, multiRowMutationState) : serverTimeStamps[i++]; + long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, + multiRowMutationState) : serverTimeStamps[i++]; Long scn = connection.getSCN(); long mutationTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; final PTable table = tableRef.getTable(); - Iterator>> mutationsIterator = addRowMutations(tableRef, multiRowMutationState, mutationTimestamp, serverTimestamp, false, sendAll); + Iterator>> mutationsIterator = addRowMutations(tableRef, + multiRowMutationState, mutationTimestamp, serverTimestamp, false, sendAll); // build map from physical table to mutation list boolean isDataTable = true; while (mutationsIterator.hasNext()) { - Pair> pair = mutationsIterator.next(); + Pair> pair = mutationsIterator.next(); PName hTableName = pair.getFirst(); List mutationList = pair.getSecond(); TableInfo tableInfo = new TableInfo(isDataTable, hTableName, tableRef); List oldMutationList = physicalTableMutationMap.put(tableInfo, mutationList); - if (oldMutationList!=null) - mutationList.addAll(0, oldMutationList); + if (oldMutationList != null) mutationList.addAll(0, oldMutationList); isDataTable = false; } // For transactions, track the statement indexes as we send data @@ -1049,23 +1033,25 @@ public class MutationState implements SQLCloseable { } } long serverTimestamp = HConstants.LATEST_TIMESTAMP; - Iterator>> mutationsIterator = physicalTableMutationMap.entrySet().iterator(); + Iterator>> mutationsIterator = physicalTableMutationMap.entrySet() + .iterator(); while (mutationsIterator.hasNext()) { Entry> pair = mutationsIterator.next(); TableInfo tableInfo = pair.getKey(); byte[] htableName = tableInfo.getHTableName().getBytes(); List mutationList = pair.getValue(); - - //create a span per target table - //TODO maybe we can be smarter about the table name to string here? - Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName)); + + // create a span per target table + // TODO maybe we can be smarter about the table name to string here? + Span child = Tracing.child(span, "Writing mutation batch for table: " + Bytes.toString(htableName)); int retryCount = 0; boolean shouldRetry = false; long numMutations = 0; long mutationSizeBytes = 0; long mutationCommitTime = 0; - long numFailedMutations = 0;; + long numFailedMutations = 0; + ; long startTime = 0; boolean shouldRetryIndexedMutation = false; IndexWriteException iwe = null; @@ -1073,11 +1059,12 @@ public class MutationState implements SQLCloseable { TableRef origTableRef = tableInfo.getOrigTableRef(); PTable table = origTableRef.getTable(); table.getIndexMaintainers(indexMetaDataPtr, connection); - final ServerCache cache = tableInfo.isDataTable() ? setMetaDataOnMutations(origTableRef, mutationList, indexMetaDataPtr) : null; + final ServerCache cache = tableInfo.isDataTable() ? setMetaDataOnMutations(origTableRef, + mutationList, indexMetaDataPtr) : null; // If we haven't retried yet, retry for this case only, as it's possible that // a split will occur after we send the index metadata cache to all known // region servers. - shouldRetry = cache!=null; + shouldRetry = cache != null; SQLException sqlE = null; HTableInterface hTable = connection.getQueryServices().getTable(htableName); try { @@ -1093,16 +1080,17 @@ public class MutationState implements SQLCloseable { hTable = new MetaDataAwareHTable(hTable, origTableRef); } - hTable = phoenixTransactionContext.getTransactionalTable(hTable, table.isImmutableRows()); + hTable = phoenixTransactionContext.getTransactionalTableWriter(hTable, table); } - + numMutations = mutationList.size(); GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); mutationSizeBytes = calculateMutationSize(mutationList); - + startTime = System.currentTimeMillis(); child.addTimelineAnnotation("Attempt " + retryCount); - List> mutationBatchList = getMutationBatchList(batchSize, batchSizeBytes, mutationList); + List> mutationBatchList = getMutationBatchList(batchSize, batchSizeBytes, + mutationList); for (final List mutationBatch : mutationBatchList) { if (shouldRetryIndexedMutation) { // if there was an index write failure, retry the mutation in a loop @@ -1116,14 +1104,16 @@ public class MutationState implements SQLCloseable { Thread.currentThread().interrupt(); throw new IOException(e); } - }}, iwe, - connection, connection.getQueryServices().getProps()); + } + }, iwe, connection, connection.getQueryServices().getProps()); } else { hTable.batch(mutationBatch); } batchCount++; - if (logger.isDebugEnabled()) logger.debug("Sent batch of " + mutationBatch.size() + " for " + Bytes.toString(htableName)); + if (logger.isDebugEnabled()) + logger.debug("Sent batch of " + mutationBatch.size() + " for " + + Bytes.toString(htableName)); } child.stop(); child.stop(); @@ -1131,7 +1121,7 @@ public class MutationState implements SQLCloseable { mutationCommitTime = System.currentTimeMillis() - startTime; GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime); numFailedMutations = 0; - + // Remove batches as we process them mutations.remove(origTableRef); if (tableInfo.isDataTable()) { @@ -1140,22 +1130,28 @@ public class MutationState implements SQLCloseable { estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(mutations); } } catch (Exception e) { - mutationCommitTime = System.currentTimeMillis() - startTime; + mutationCommitTime = System.currentTimeMillis() - startTime; serverTimestamp = ServerUtil.parseServerTimestamp(e); SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); if (inferredE != null) { - if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { - // Swallow this exception once, as it's possible that we split after sending the index metadata - // and one of the region servers doesn't have it. This will cause it to have it the next go around. + if (shouldRetry + && retryCount == 0 + && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND + .getErrorCode()) { + // Swallow this exception once, as it's possible that we split after sending the index + // metadata + // and one of the region servers doesn't have it. This will cause it to have it the next + // go around. // If it fails again, we don't retry. - String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; + String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + + inferredE; logger.warn(LogUtil.addCustomAnnotations(msg, connection)); connection.getQueryServices().clearTableRegionCache(htableName); // add a new child span as this one failed child.addTimelineAnnotation(msg); child.stop(); - child = Tracing.child(span,"Failed batch, attempting retry"); + child = Tracing.child(span, "Failed batch, attempting retry"); continue; } else if (inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) { @@ -1164,7 +1160,8 @@ public class MutationState implements SQLCloseable { // For an index write failure, the data table write succeeded, // so when we retry we need to set REPLAY_WRITES for (Mutation m : mutationList) { - m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES); + m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, + BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES); KeyValueUtil.setTimestamp(m, serverTimestamp); } shouldRetry = true; @@ -1177,29 +1174,26 @@ public class MutationState implements SQLCloseable { // Throw to client an exception that indicates the statements that // were not committed successfully. int[] uncommittedStatementIndexes = getUncommittedStatementIndexes(); - sqlE = new CommitException(e, uncommittedStatementIndexes, serverTimestamp); - numFailedMutations = uncommittedStatementIndexes.length; - GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations); + sqlE = new CommitException(e, uncommittedStatementIndexes, serverTimestamp); + numFailedMutations = uncommittedStatementIndexes.length; + GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations); } finally { - MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime, numFailedMutations); + MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, + mutationCommitTime, numFailedMutations); mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric); try { - if (cache!=null) - cache.close(); + if (cache != null) cache.close(); } finally { try { hTable.close(); - } - catch (IOException e) { + } catch (IOException e) { if (sqlE != null) { sqlE.setNextException(ServerUtil.parseServerException(e)); } else { sqlE = ServerUtil.parseServerException(e); } - } - if (sqlE != null) { - throw sqlE; } + if (sqlE != null) { throw sqlE; } } } } while (shouldRetry && retryCount++ < 1); @@ -1209,10 +1203,13 @@ public class MutationState implements SQLCloseable { /** * Split the list of mutations into multiple lists that don't exceed row and byte thresholds - * @param allMutationList List of HBase mutations + * + * @param allMutationList + * List of HBase mutations * @return List of lists of mutations */ - public static List> getMutationBatchList(long batchSize, long batchSizeBytes, List allMutationList) { + public static List> getMutationBatchList(long batchSize, long batchSizeBytes, + List allMutationList) { List> mutationBatchList = Lists.newArrayList(); List currentList = Lists.newArrayList(); long currentBatchSizeBytes = 0L; @@ -1243,12 +1240,10 @@ public class MutationState implements SQLCloseable { ImmutableBytesWritable indexMetaDataPtr) throws SQLException { PTable table = tableRef.getTable(); final byte[] tenantIdBytes; - if(table.isMultiTenant()) { - tenantIdBytes = connection.getTenantId() == null ? null : - ScanUtil.getTenantIdBytes( - table.getRowKeySchema(), - table.getBucketNum() != null, - connection.getTenantId(), table.getViewIndexId() != null); + if (table.isMultiTenant()) { + tenantIdBytes = connection.getTenantId() == null ? null : ScanUtil.getTenantIdBytes( + table.getRowKeySchema(), table.getBucketNum() != null, connection.getTenantId(), + table.getViewIndexId() != null); } else { tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); } @@ -1261,7 +1256,8 @@ public class MutationState implements SQLCloseable { } boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0; if (hasIndexMetaData) { - if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + txState.length)) { + if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, indexMetaDataPtr.getLength() + + txState.length)) { IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, txState); uuidValue = cache.getId(); @@ -1269,9 +1265,7 @@ public class MutationState implements SQLCloseable { attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); uuidValue = ServerCacheClient.generateId(); } - } else if (txState.length == 0) { - return null; - } + } else if (txState.length == 0) { return null; } // Either set the UUID to be able to access the index metadata from the cache // or set the index metadata directly on the Mutation for (Mutation mutation : mutations) { @@ -1281,7 +1275,8 @@ public class MutationState implements SQLCloseable { mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); if (attribValue != null) { mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue); - mutation.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION)); + mutation.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, + Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION)); if (txState.length > 0) { mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } @@ -1291,23 +1286,23 @@ public class MutationState implements SQLCloseable { } return cache; } - + private void addUncommittedStatementIndexes(Collection rowMutations) { for (RowMutationState rowMutationState : rowMutations) { - uncommittedStatementIndexes = joinSortedIntArrays(uncommittedStatementIndexes, rowMutationState.getStatementIndexes()); + uncommittedStatementIndexes = joinSortedIntArrays(uncommittedStatementIndexes, + rowMutationState.getStatementIndexes()); } } - + private int[] getUncommittedStatementIndexes() { for (MultiRowMutationState rowMutationMap : mutations.values()) { addUncommittedStatementIndexes(rowMutationMap.values()); } return uncommittedStatementIndexes; } - + @Override - public void close() throws SQLException { - } + public void close() throws SQLException {} private void resetState() { numRows = 0; @@ -1335,26 +1330,29 @@ public class MutationState implements SQLCloseable { Map txMutations = Collections.emptyMap(); int retryCount = 0; do { - boolean sendSuccessful=false; + boolean sendSuccessful = false; boolean retryCommit = false; SQLException sqlE = null; try { send(); txMutations = this.txMutations; - sendSuccessful=true; + sendSuccessful = true; } catch (SQLException e) { sqlE = e; } finally { try { - boolean finishSuccessful=false; + boolean finishSuccessful = false; try { if (sendSuccessful) { phoenixTransactionContext.commit(); finishSuccessful = true; } } catch (SQLException e) { - if (logger.isInfoEnabled()) logger.info(e.getClass().getName() + " at timestamp " + getInitialWritePointer() + " with retry count of " + retryCount); - retryCommit = (e.getErrorCode() == SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode() && retryCount < MAX_COMMIT_RETRIES); + if (logger.isInfoEnabled()) + logger.info(e.getClass().getName() + " at timestamp " + getInitialWritePointer() + + " with retry count of " + retryCount); + retryCommit = (e.getErrorCode() == SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION + .getErrorCode() && retryCount < MAX_COMMIT_RETRIES); if (sqlE == null) { sqlE = e; } else { @@ -1401,9 +1399,7 @@ public class MutationState implements SQLCloseable { } } } - if (sqlE != null && !retryCommit) { - throw sqlE; - } + if (sqlE != null && !retryCommit) { throw sqlE; } } } } @@ -1418,11 +1414,12 @@ public class MutationState implements SQLCloseable { /** * Determines whether indexes were added to mutated tables while the transaction was in progress. + * * @return true if indexes were added and false otherwise. - * @throws SQLException + * @throws SQLException */ private boolean shouldResubmitTransaction(Set txTableRefs) throws SQLException { - if (logger.isInfoEnabled()) logger.info("Checking for index updates as of " + getInitialWritePointer()); + if (logger.isInfoEnabled()) logger.info("Checking for index updates as of " + getInitialWritePointer()); MetaDataClient client = new MetaDataClient(connection); PMetaData cache = connection.getMetaDataCache(); boolean addedAnyIndexes = false; @@ -1432,15 +1429,16 @@ public class MutationState implements SQLCloseable { List oldIndexes; PTableRef ptableRef = cache.getTableRef(dataTable.getKey()); oldIndexes = ptableRef.getTable().getIndexes(); - // Always check at server for metadata change, as it's possible that the table is configured to not check for metadata changes + // Always check at server for metadata change, as it's possible that the table is configured to not check + // for metadata changes // but in this case, the tx manager is telling us it's likely that there has been a change. - MetaDataMutationResult result = client.updateCache(dataTable.getTenantId(), dataTable.getSchemaName().getString(), dataTable.getTableName().getString(), true); + MetaDataMutationResult result = client.updateCache(dataTable.getTenantId(), dataTable.getSchemaName() + .getString(), dataTable.getTableName().getString(), true); long timestamp = TransactionUtil.getResolvedTime(connection, result); tableRef.setTimeStamp(timestamp); PTable updatedDataTable = result.getTable(); - if (updatedDataTable == null) { - throw new TableNotFoundException(dataTable.getSchemaName().getString(), dataTable.getTableName().getString()); - } + if (updatedDataTable == null) { throw new TableNotFoundException(dataTable.getSchemaName().getString(), + dataTable.getTableName().getString()); } allImmutableTables &= updatedDataTable.isImmutableRows(); tableRef.setTable(updatedDataTable); if (!addedAnyIndexes) { @@ -1448,10 +1446,14 @@ public class MutationState implements SQLCloseable { // that an index was dropped and recreated with the same name but different // indexed/covered columns. addedAnyIndexes = (!oldIndexes.equals(updatedDataTable.getIndexes())); - if (logger.isInfoEnabled()) logger.info((addedAnyIndexes ? "Updates " : "No updates ") + "as of " + timestamp + " to " + updatedDataTable.getName().getString() + " with indexes " + updatedDataTable.getIndexes()); + if (logger.isInfoEnabled()) + logger.info((addedAnyIndexes ? "Updates " : "No updates ") + "as of " + timestamp + " to " + + updatedDataTable.getName().getString() + " with indexes " + updatedDataTable.getIndexes()); } } - if (logger.isInfoEnabled()) logger.info((addedAnyIndexes ? "Updates " : "No updates ") + "to indexes as of " + getInitialWritePointer() + " over " + (allImmutableTables ? " all immutable tables" : " some mutable tables")); + if (logger.isInfoEnabled()) + logger.info((addedAnyIndexes ? "Updates " : "No updates ") + "to indexes as of " + getInitialWritePointer() + + " over " + (allImmutableTables ? " all immutable tables" : " some mutable tables")); // If all tables are immutable, we know the conflict we got was due to our DDL/DML fence. // If any indexes were added, then the conflict might be due to DDL/DML fence. return allImmutableTables || addedAnyIndexes; @@ -1459,16 +1461,18 @@ public class MutationState implements SQLCloseable { /** * Send to HBase any uncommitted data for transactional tables. + * * @return true if any data was sent and false otherwise. * @throws SQLException */ public boolean sendUncommitted() throws SQLException { return sendUncommitted(mutations.keySet().iterator()); } + /** - * Support read-your-own-write semantics by sending uncommitted data to HBase prior to running a - * query. In this way, they are visible to subsequent reads but are not actually committed until - * commit is called. + * Support read-your-own-write semantics by sending uncommitted data to HBase prior to running a query. In this way, + * they are visible to subsequent reads but are not actually committed until commit is called. + * * @param tableRefs * @return true if any data was sent and false otherwise. * @throws SQLException @@ -1481,7 +1485,7 @@ public class MutationState implements SQLCloseable { phoenixTransactionContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT); } - Iterator filteredTableRefs = Iterators.filter(tableRefs, new Predicate(){ + Iterator filteredTableRefs = Iterators.filter(tableRefs, new Predicate() { @Override public boolean apply(TableRef tableRef) { return tableRef.getTable().isTransactional(); @@ -1497,113 +1501,120 @@ public class MutationState implements SQLCloseable { if (tableRef.getTable().isTransactional()) { startTransaction(tableRef.getTable().getTransactionProvider()); } - strippedAliases.add(new TableRef(null, tableRef.getTable(), tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(), tableRef.hasDynamicCols())); + strippedAliases.add(new TableRef(null, tableRef.getTable(), tableRef.getTimeStamp(), tableRef + .getLowerBoundTimeStamp(), tableRef.hasDynamicCols())); } send(strippedAliases.iterator()); return true; } return false; } - + public void send() throws SQLException { send(null); } - + public static int[] joinSortedIntArrays(int[] a, int[] b) { int[] result = new int[a.length + b.length]; int i = 0, j = 0, k = 0, current; while (i < a.length && j < b.length) { current = a[i] < b[j] ? a[i++] : b[j++]; - for ( ; i < a.length && a[i] == current; i++); - for ( ; j < b.length && b[j] == current; j++); + for (; i < a.length && a[i] == current; i++) + ; + for (; j < b.length && b[j] == current; j++) + ; result[k++] = current; } while (i < a.length) { - for (current = a[i++] ; i < a.length && a[i] == current; i++); + for (current = a[i++]; i < a.length && a[i] == current; i++) + ; result[k++] = current; } while (j < b.length) { - for (current = b[j++] ; j < b.length && b[j] == current; j++); + for (current = b[j++]; j < b.length && b[j] == current; j++) + ; result[k++] = current; } return Arrays.copyOf(result, k); } - + @Immutable public static class RowTimestampColInfo { private final boolean useServerTimestamp; - private final Long rowTimestamp; - + private final Long rowTimestamp; + public static final RowTimestampColInfo NULL_ROWTIMESTAMP_INFO = new RowTimestampColInfo(false, null); public RowTimestampColInfo(boolean autoGenerate, Long value) { this.useServerTimestamp = autoGenerate; this.rowTimestamp = value; } - + public boolean useServerTimestamp() { return useServerTimestamp; } - + public Long getTimestamp() { return rowTimestamp; } } - + public static class MultiRowMutationState { - private Map rowKeyToRowMutationState; + private Map rowKeyToRowMutationState; private long estimatedSize; - + public MultiRowMutationState(int size) { this.rowKeyToRowMutationState = Maps.newHashMapWithExpectedSize(size); this.estimatedSize = 0; } - - public RowMutationState put(ImmutableBytesPtr ptr, RowMutationState rowMutationState) { + + public RowMutationState put(ImmutableBytesPtr ptr, RowMutationState rowMutationState) { estimatedSize += rowMutationState.calculateEstimatedSize(); return rowKeyToRowMutationState.put(ptr, rowMutationState); } - + public void putAll(MultiRowMutationState other) { estimatedSize += other.estimatedSize; rowKeyToRowMutationState.putAll(other.rowKeyToRowMutationState); } - + public boolean isEmpty() { return rowKeyToRowMutationState.isEmpty(); } - + public int size() { return rowKeyToRowMutationState.size(); } - + public Set> entrySet() { return rowKeyToRowMutationState.entrySet(); } - - public void clear(){ + + public void clear() { rowKeyToRowMutationState.clear(); estimatedSize = 0; } - + public Collection values() { return rowKeyToRowMutationState.values(); } } - + public static class RowMutationState { - @Nonnull private Map columnValues; + @Nonnull + private Map columnValues; private int[] statementIndexes; - @Nonnull private final RowTimestampColInfo rowTsColInfo; + @Nonnull + private final RowTimestampColInfo rowTsColInfo; private byte[] onDupKeyBytes; private long colValuesSize; - - public RowMutationState(@Nonnull Map columnValues, long colValuesSize, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo, - byte[] onDupKeyBytes) { + + public RowMutationState(@Nonnull Map columnValues, long colValuesSize, int statementIndex, + @Nonnull RowTimestampColInfo rowTsColInfo, byte[] onDupKeyBytes) { checkNotNull(columnValues); checkNotNull(rowTsColInfo); this.columnValues = columnValues; - this.statementIndexes = new int[] {statementIndex}; + this.statementIndexes = new int[] { statementIndex }; this.rowTsColInfo = rowTsColInfo; this.onDupKeyBytes = onDupKeyBytes; this.colValuesSize = colValuesSize; @@ -1631,13 +1642,13 @@ public class MutationState implements SQLCloseable { // ignore the new values (as that's what the server will do). if (newRow.onDupKeyBytes == null) { // increment the column value size by the new row column value size - colValuesSize+=newRow.colValuesSize; - for (Map.Entry entry : newRow.columnValues.entrySet()) { + colValuesSize += newRow.colValuesSize; + for (Map.Entry entry : newRow.columnValues.entrySet()) { PColumn col = entry.getKey(); byte[] oldValue = columnValues.put(col, entry.getValue()); - if (oldValue!=null) { + if (oldValue != null) { // decrement column value size by the size of all column values that were replaced - colValuesSize-=(col.getEstimatedSize() + oldValue.length); + colValuesSize -= (col.getEstimatedSize() + oldValue.length); } } } @@ -1646,14 +1657,14 @@ public class MutationState implements SQLCloseable { this.onDupKeyBytes = PhoenixIndexBuilder.combineOnDupKey(this.onDupKeyBytes, newRow.onDupKeyBytes); statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes()); } - + @Nonnull RowTimestampColInfo getRowTimestampColInfo() { return rowTsColInfo; } } - + public ReadMetricQueue getReadMetricQueue() { return readMetricQueue; } @@ -1665,5 +1676,5 @@ public class MutationState implements SQLCloseable { public MutationMetricQueue getMutationMetricQueue() { return mutationMetricQueue; } - + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/edb6dcf5/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java index 6348d6d..0016fa9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java @@ -95,7 +95,7 @@ public class PhoenixTxIndexMutationGenerator { stored.addAll(m); } - public Collection> getIndexUpdates(HTableInterface htable, Iterator mutationIterator) throws IOException, SQLException { + public Collection> getIndexUpdates(HTableInterface htable, Iterator mutationIterator) throws IOException, SQLException { if (!mutationIterator.hasNext()) { return Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/edb6dcf5/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java index 543eda1..ab9e8a6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java @@ -130,4 +130,10 @@ public class OmidTransactionContext implements PhoenixTransactionContext { // TODO Auto-generated method stub return null; } + + @Override + public HTableInterface getTransactionalTableWriter(HTableInterface htable, PTable table) { + // TODO Auto-generated method stub + return null; + } }