Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 16A06200B65 for ; Wed, 17 Aug 2016 20:15:43 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 15159160A6C; Wed, 17 Aug 2016 18:15:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D9745160A8C for ; Wed, 17 Aug 2016 20:15:40 +0200 (CEST) Received: (qmail 22186 invoked by uid 500); 17 Aug 2016 18:15:40 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 22137 invoked by uid 99); 17 Aug 2016 18:15:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Aug 2016 18:15:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D7ED2E05E1; Wed, 17 Aug 2016 18:15:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tdsilva@apache.org To: commits@phoenix.apache.org Date: Wed, 17 Aug 2016 18:15:40 -0000 Message-Id: <5e6409e786c5416fbd298d2c50929766@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] phoenix git commit: PHOENIX-2995 Write performance severely degrades with large number of views archived-at: Wed, 17 Aug 2016 18:15:43 -0000 PHOENIX-2995 Write performance severely degrades with large number of views Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c22020a4 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c22020a4 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c22020a4 Branch: refs/heads/4.x-HBase-1.0 Commit: c22020a479ac39208616d647fbae339cb3fdb1b4 Parents: 24e670d Author: Thomas D'Silva Authored: Fri Jul 22 14:24:38 2016 -0700 Committer: Thomas D'Silva Committed: Wed Aug 17 11:14:04 2016 -0700 ---------------------------------------------------------------------- .../apache/phoenix/end2end/UpsertSelectIT.java | 2 +- .../phoenix/compile/CreateTableCompiler.java | 7 +- .../apache/phoenix/execute/MutationState.java | 328 ++++++++++++------- .../apache/phoenix/jdbc/PhoenixConnection.java | 57 ++-- .../query/ConnectionQueryServicesImpl.java | 93 +++--- .../query/ConnectionlessQueryServicesImpl.java | 47 ++- .../query/DelegateConnectionQueryServices.java | 38 +-- .../apache/phoenix/query/MetaDataMutated.java | 19 +- .../org/apache/phoenix/schema/PMetaData.java | 4 +- .../apache/phoenix/schema/PMetaDataImpl.java | 107 +++--- .../phoenix/schema/PSynchronizedMetaData.java | 249 ++++++++++++++ .../apache/phoenix/util/TransactionUtil.java | 4 +- .../phoenix/schema/PMetaDataImplTest.java | 68 ++-- 13 files changed, 652 insertions(+), 371 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c22020a4/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java index 30de4de..4d811a4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java @@ -1022,7 +1022,7 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT { // Upsert data with scn set on the connection. The timestamp of the put will be the value of the row_timestamp column. long rowTimestamp1 = 100; Date rowTimestampDate = new Date(rowTimestamp1); - try (Connection conn = getConnection(ts)) { + try (Connection conn = getConnection(ts+1)) { PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (PK1, PK2, PK3, KV1) VALUES(?, ?, ?, ?)"); stmt.setInt(1, 1); stmt.setDate(2, rowTimestampDate); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c22020a4/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java index b545156..3928f66 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java @@ -54,7 +54,6 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PDatum; -import org.apache.phoenix.schema.PMetaData; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.PTableType; @@ -143,11 +142,11 @@ public class CreateTableCompiler { // on our connection. new DelegateConnectionQueryServices(connection.getQueryServices()) { @Override - public PMetaData addTable(PTable table, long resolvedTime) throws SQLException { - return connection.addTable(table, resolvedTime); + public void addTable(PTable table, long resolvedTime) throws SQLException { + connection.addTable(table, resolvedTime); } }, - connection, tableRef.getTimeStamp()); + connection, tableRef.getTimeStamp()+1); viewColumnConstantsToBe = new byte[nColumns][]; ViewWhereExpressionVisitor visitor = new ViewWhereExpressionVisitor(parentToBe, viewColumnConstantsToBe); where.accept(visitor); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c22020a4/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index d44b679..38d24aa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -70,6 +71,7 @@ import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PMetaData; +import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; @@ -359,7 +361,7 @@ public class MutationState implements SQLCloseable { HTableInterface htable = this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes()); Transaction currentTx; if (table.isTransactional() && (currentTx=getTransaction()) != null) { - TransactionAwareHTable txAware = TransactionUtil.getTransactionAwareHTable(htable, table); + TransactionAwareHTable txAware = TransactionUtil.getTransactionAwareHTable(htable, table.isImmutableRows()); // Using cloned mutationState as we may have started a new transaction already // if auto commit is true and we need to use the original one here. txAware.startTx(currentTx); @@ -553,7 +555,7 @@ public class MutationState implements SQLCloseable { return ptr; } - private Iterator>> addRowMutations(final TableRef tableRef, final Map values, + private Iterator>> addRowMutations(final TableRef tableRef, final Map values, final long timestamp, boolean includeAllIndexes, final boolean sendAll) { final PTable table = tableRef.getTable(); final Iterator indexes = // Only maintain tables with immutable rows through this client-side mechanism @@ -565,7 +567,7 @@ public class MutationState implements SQLCloseable { final List mutationList = Lists.newArrayListWithExpectedSize(values.size()); final List mutationsPertainingToIndex = indexes.hasNext() ? Lists.newArrayListWithExpectedSize(values.size()) : null; generateMutations(tableRef, timestamp, values, mutationList, mutationsPertainingToIndex); - return new Iterator>>() { + return new Iterator>>() { boolean isFirst = true; @Override @@ -574,10 +576,10 @@ public class MutationState implements SQLCloseable { } @Override - public Pair> next() { + public Pair> next() { if (isFirst) { isFirst = false; - return new Pair>(table.getPhysicalName().getBytes(), mutationList); + return new Pair>(table.getPhysicalName(), mutationList); } PTable index = indexes.next(); List indexMutations; @@ -598,7 +600,7 @@ public class MutationState implements SQLCloseable { } catch (SQLException e) { throw new IllegalDataException(e); } - return new Pair>(index.getPhysicalName().getBytes(),indexMutations); + return new Pair>(index.getPhysicalName(),indexMutations); } @Override @@ -685,7 +687,24 @@ public class MutationState implements SQLCloseable { private Iterator>> innerIterator = init(); private Iterator>> init() { - return addRowMutations(current.getKey(), current.getValue(), timestamp, includeMutableIndexes, true); + final Iterator>> mutationIterator = addRowMutations(current.getKey(), current.getValue(), timestamp, includeMutableIndexes, true); + return new Iterator>>() { + @Override + public boolean hasNext() { + return mutationIterator.hasNext(); + } + + @Override + public Pair> next() { + Pair> pair = mutationIterator.next(); + return new Pair>(pair.getFirst().getBytes(), pair.getSecond()); + } + + @Override + public void remove() { + mutationIterator.remove(); + } + }; } @Override @@ -870,6 +889,55 @@ public class MutationState implements SQLCloseable { } } + private static class TableInfo { + + private final boolean isDataTable; + @Nonnull private final PName hTableName; + @Nonnull private final TableRef origTableRef; + + public TableInfo(boolean isDataTable, PName hTableName, TableRef origTableRef) { + super(); + checkNotNull(hTableName); + checkNotNull(origTableRef); + this.isDataTable = isDataTable; + this.hTableName = hTableName; + this.origTableRef = origTableRef; + } + + public boolean isDataTable() { + return isDataTable; + } + + public PName getHTableName() { + return hTableName; + } + + public TableRef getOrigTableRef() { + return origTableRef; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + hTableName.hashCode(); + result = prime * result + (isDataTable ? 1231 : 1237); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + TableInfo other = (TableInfo) obj; + if (!hTableName.equals(other.hTableName)) return false; + if (isDataTable != other.isDataTable) return false; + return true; + } + + } + @SuppressWarnings("deprecation") private void send(Iterator tableRefIterator) throws SQLException { int i = 0; @@ -883,6 +951,7 @@ public class MutationState implements SQLCloseable { Map valuesMap; List txTableRefs = Lists.newArrayListWithExpectedSize(mutations.size()); + Map> physicalTableMutationMap = Maps.newLinkedHashMap(); // add tracing for this operation try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) { Span span = trace.getSpan(); @@ -898,126 +967,24 @@ public class MutationState implements SQLCloseable { // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++]; final PTable table = tableRef.getTable(); - // Track tables to which we've sent uncommitted data - if (isTransactional = table.isTransactional()) { - txTableRefs.add(tableRef); - addDMLFence(table); - uncommittedPhysicalNames.add(table.getPhysicalName().getString()); - } + Iterator>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll); + // build map from physical table to mutation list boolean isDataTable = true; - table.getIndexMaintainers(indexMetaDataPtr, connection); - Iterator>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll); while (mutationsIterator.hasNext()) { - Pair> pair = mutationsIterator.next(); - byte[] htableName = pair.getFirst(); + Pair> pair = mutationsIterator.next(); + PName hTableName = pair.getFirst(); List mutationList = pair.getSecond(); - - //create a span per target table - //TODO maybe we can be smarter about the table name to string here? - Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName)); - - int retryCount = 0; - boolean shouldRetry = false; - do { - final ServerCache cache = isDataTable ? setMetaDataOnMutations(tableRef, mutationList, indexMetaDataPtr) : null; - - // If we haven't retried yet, retry for this case only, as it's possible that - // a split will occur after we send the index metadata cache to all known - // region servers. - shouldRetry = cache != null; - SQLException sqlE = null; - HTableInterface hTable = connection.getQueryServices().getTable(htableName); - try { - if (isTransactional) { - // If we have indexes, wrap the HTable in a delegate HTable that - // will attach the necessary index meta data in the event of a - // rollback - if (!table.getIndexes().isEmpty()) { - hTable = new MetaDataAwareHTable(hTable, tableRef); - } - TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable, table); - // Don't add immutable indexes (those are the only ones that would participate - // during a commit), as we don't need conflict detection for these. - if (isDataTable) { - // Even for immutable, we need to do this so that an abort has the state - // necessary to generate the rows to delete. - addTransactionParticipant(txnAware); - } else { - txnAware.startTx(getTransaction()); - } - hTable = txnAware; - } - long numMutations = mutationList.size(); - GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); - - long startTime = System.currentTimeMillis(); - child.addTimelineAnnotation("Attempt " + retryCount); - hTable.batch(mutationList); - if (logger.isDebugEnabled()) logger.debug("Sent batch of " + numMutations + " for " + Bytes.toString(htableName)); - child.stop(); - child.stop(); - shouldRetry = false; - long mutationCommitTime = System.currentTimeMillis() - startTime; - GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime); - - long mutationSizeBytes = calculateMutationSize(mutationList); - MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime); - mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric); - } catch (Exception e) { - SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); - if (inferredE != null) { - if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { - // Swallow this exception once, as it's possible that we split after sending the index metadata - // and one of the region servers doesn't have it. This will cause it to have it the next go around. - // If it fails again, we don't retry. - String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; - logger.warn(LogUtil.addCustomAnnotations(msg, connection)); - connection.getQueryServices().clearTableRegionCache(htableName); - - // add a new child span as this one failed - child.addTimelineAnnotation(msg); - child.stop(); - child = Tracing.child(span,"Failed batch, attempting retry"); - - continue; - } - e = inferredE; - } - // Throw to client an exception that indicates the statements that - // were not committed successfully. - sqlE = new CommitException(e, getUncommittedStatementIndexes()); - } finally { - try { - if (cache != null) { - cache.close(); - } - } finally { - try { - hTable.close(); - } - catch (IOException e) { - if (sqlE != null) { - sqlE.setNextException(ServerUtil.parseServerException(e)); - } else { - sqlE = ServerUtil.parseServerException(e); - } - } - if (sqlE != null) { - throw sqlE; - } - } - } - } while (shouldRetry && retryCount++ < 1); + TableInfo tableInfo = new TableInfo(isDataTable, hTableName, tableRef); + List oldMutationList = physicalTableMutationMap.put(tableInfo, mutationList); + if (oldMutationList!=null) + mutationList.addAll(0, oldMutationList); isDataTable = false; } - if (tableRef.getTable().getType() != PTableType.INDEX) { - numRows -= valuesMap.size(); - } // For transactions, track the statement indexes as we send data // over because our CommitException should include all statements // involved in the transaction since none of them would have been // committed in the event of a failure. - if (isTransactional) { + if (table.isTransactional()) { addUncommittedStatementIndexes(valuesMap.values()); if (txMutations.isEmpty()) { txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); @@ -1029,15 +996,122 @@ public class MutationState implements SQLCloseable { // indexes have changed. joinMutationState(new TableRef(tableRef), valuesMap, txMutations); } - // Remove batches as we process them - if (sendAll) { - // Iterating through map key set in this case, so we cannot use - // the remove method without getting a concurrent modification - // exception. - tableRefIterator.remove(); - } else { - mutations.remove(tableRef); - } + } + Iterator>> mutationsIterator = physicalTableMutationMap.entrySet().iterator(); + while (mutationsIterator.hasNext()) { + Entry> pair = mutationsIterator.next(); + TableInfo tableInfo = pair.getKey(); + byte[] htableName = tableInfo.getHTableName().getBytes(); + List mutationList = pair.getValue(); + + //create a span per target table + //TODO maybe we can be smarter about the table name to string here? + Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName)); + + int retryCount = 0; + boolean shouldRetry = false; + do { + TableRef origTableRef = tableInfo.getOrigTableRef(); + PTable table = origTableRef.getTable(); + table.getIndexMaintainers(indexMetaDataPtr, connection); + final ServerCache cache = tableInfo.isDataTable() ? setMetaDataOnMutations(origTableRef, mutationList, indexMetaDataPtr) : null; + // If we haven't retried yet, retry for this case only, as it's possible that + // a split will occur after we send the index metadata cache to all known + // region servers. + shouldRetry = cache!=null; + SQLException sqlE = null; + HTableInterface hTable = connection.getQueryServices().getTable(htableName); + try { + if (table.isTransactional()) { + // Track tables to which we've sent uncommitted data + txTableRefs.add(origTableRef); + addDMLFence(table); + uncommittedPhysicalNames.add(table.getPhysicalName().getString()); + + // If we have indexes, wrap the HTable in a delegate HTable that + // will attach the necessary index meta data in the event of a + // rollback + if (!table.getIndexes().isEmpty()) { + hTable = new MetaDataAwareHTable(hTable, origTableRef); + } + TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable, table.isImmutableRows()); + // Don't add immutable indexes (those are the only ones that would participate + // during a commit), as we don't need conflict detection for these. + if (tableInfo.isDataTable()) { + // Even for immutable, we need to do this so that an abort has the state + // necessary to generate the rows to delete. + addTransactionParticipant(txnAware); + } else { + txnAware.startTx(getTransaction()); + } + hTable = txnAware; + } + + long numMutations = mutationList.size(); + GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); + + long startTime = System.currentTimeMillis(); + child.addTimelineAnnotation("Attempt " + retryCount); + hTable.batch(mutationList); + if (logger.isDebugEnabled()) logger.debug("Sent batch of " + numMutations + " for " + Bytes.toString(htableName)); + child.stop(); + child.stop(); + shouldRetry = false; + long mutationCommitTime = System.currentTimeMillis() - startTime; + GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime); + + long mutationSizeBytes = calculateMutationSize(mutationList); + MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime); + mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric); + if (tableInfo.isDataTable()) { + numRows -= numMutations; + } + // Remove batches as we process them + mutations.remove(origTableRef); + } catch (Exception e) { + SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); + if (inferredE != null) { + if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { + // Swallow this exception once, as it's possible that we split after sending the index metadata + // and one of the region servers doesn't have it. This will cause it to have it the next go around. + // If it fails again, we don't retry. + String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; + logger.warn(LogUtil.addCustomAnnotations(msg, connection)); + connection.getQueryServices().clearTableRegionCache(htableName); + + // add a new child span as this one failed + child.addTimelineAnnotation(msg); + child.stop(); + child = Tracing.child(span,"Failed batch, attempting retry"); + + continue; + } + e = inferredE; + } + // Throw to client an exception that indicates the statements that + // were not committed successfully. + sqlE = new CommitException(e, getUncommittedStatementIndexes()); + } finally { + try { + if (cache!=null) + cache.close(); + } finally { + try { + hTable.close(); + } + catch (IOException e) { + if (sqlE != null) { + sqlE.setNextException(ServerUtil.parseServerException(e)); + } else { + sqlE = ServerUtil.parseServerException(e); + } + } + if (sqlE != null) { + throw sqlE; + } + } + } + } while (shouldRetry && retryCount++ < 1); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c22020a4/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 4d01d08..0d09e75 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -112,8 +112,9 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.SchemaUtil; - import org.apache.tephra.TransactionContext; +import org.cloudera.htrace.Sampler; +import org.cloudera.htrace.TraceScope; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; @@ -280,20 +281,21 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea long maxTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; return (table.getType() != PTableType.SYSTEM && ( table.getTimeStamp() >= maxTimestamp || - ! Objects.equal(tenantId, table.getTenantId())) ); + (table.getTenantId()!=null && ! Objects.equal(tenantId, table.getTenantId())))); } @Override public boolean prune(PFunction function) { long maxTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; return ( function.getTimeStamp() >= maxTimestamp || - ! Objects.equal(tenantId, function.getTenantId())); + (function.getTenantId()!=null && ! Objects.equal(tenantId, function.getTenantId()))); } }; this.isRequestLevelMetricsEnabled = JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info, this.services.getProps()); this.mutationState = mutationState == null ? newMutationState(maxSize) : new MutationState(mutationState); - this.metaData = metaData.pruneTables(pruner); - this.metaData = metaData.pruneFunctions(pruner); + this.metaData = metaData; + this.metaData.pruneTables(pruner); + this.metaData.pruneFunctions(pruner); this.services.addConnection(this); // setup tracing, if its enabled @@ -900,79 +902,71 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } @Override - public PMetaData addTable(PTable table, long resolvedTime) throws SQLException { - metaData = metaData.addTable(table, resolvedTime); + public void addTable(PTable table, long resolvedTime) throws SQLException { + metaData.addTable(table, resolvedTime); //Cascade through to connectionQueryServices too getQueryServices().addTable(table, resolvedTime); - return metaData; } @Override - public PMetaData updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException { - metaData = metaData.updateResolvedTimestamp(table, resolvedTime); + public void updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException { + metaData.updateResolvedTimestamp(table, resolvedTime); //Cascade through to connectionQueryServices too getQueryServices().updateResolvedTimestamp(table, resolvedTime); - return metaData; } @Override - public PMetaData addFunction(PFunction function) throws SQLException { + public void addFunction(PFunction function) throws SQLException { // TODO: since a connection is only used by one thread at a time, // we could modify this metadata in place since it's not shared. if (scn == null || scn > function.getTimeStamp()) { - metaData = metaData.addFunction(function); + metaData.addFunction(function); } //Cascade through to connectionQueryServices too getQueryServices().addFunction(function); - return metaData; } @Override - public PMetaData addSchema(PSchema schema) throws SQLException { - metaData = metaData.addSchema(schema); + public void addSchema(PSchema schema) throws SQLException { + metaData.addSchema(schema); // Cascade through to connectionQueryServices too getQueryServices().addSchema(schema); - return metaData; } @Override - public PMetaData addColumn(PName tenantId, String tableName, List columns, long tableTimeStamp, + public void addColumn(PName tenantId, String tableName, List columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long updateCacheFrequency, boolean isNamespaceMapped, long resolvedTime) throws SQLException { - metaData = metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, + metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, updateCacheFrequency, isNamespaceMapped, resolvedTime); // Cascade through to connectionQueryServices too getQueryServices().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, updateCacheFrequency, isNamespaceMapped, resolvedTime); - return metaData; } @Override - public PMetaData removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) throws SQLException { - metaData = metaData.removeTable(tenantId, tableName, parentTableName, tableTimeStamp); + public void removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) throws SQLException { + metaData.removeTable(tenantId, tableName, parentTableName, tableTimeStamp); //Cascade through to connectionQueryServices too getQueryServices().removeTable(tenantId, tableName, parentTableName, tableTimeStamp); - return metaData; } @Override - public PMetaData removeFunction(PName tenantId, String functionName, long tableTimeStamp) throws SQLException { - metaData = metaData.removeFunction(tenantId, functionName, tableTimeStamp); + public void removeFunction(PName tenantId, String functionName, long tableTimeStamp) throws SQLException { + metaData.removeFunction(tenantId, functionName, tableTimeStamp); //Cascade through to connectionQueryServices too getQueryServices().removeFunction(tenantId, functionName, tableTimeStamp); - return metaData; } @Override - public PMetaData removeColumn(PName tenantId, String tableName, List columnsToRemove, long tableTimeStamp, + public void removeColumn(PName tenantId, String tableName, List columnsToRemove, long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException { - metaData = metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime); + metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime); //Cascade through to connectionQueryServices too getQueryServices().removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime); - return metaData; } protected boolean removeStatement(PhoenixStatement statement) throws SQLException { @@ -1072,11 +1066,10 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } @Override - public PMetaData removeSchema(PSchema schema, long schemaTimeStamp) { - metaData = metaData.removeSchema(schema, schemaTimeStamp); + public void removeSchema(PSchema schema, long schemaTimeStamp) { + metaData.removeSchema(schema, schemaTimeStamp); // Cascade through to connectionQueryServices too getQueryServices().removeSchema(schema, schemaTimeStamp); - return metaData; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c22020a4/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index e6fd1f6..23f6964 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -31,8 +31,6 @@ import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0; import java.io.IOException; import java.lang.ref.WeakReference; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; @@ -84,7 +82,6 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator; -import org.apache.hadoop.hbase.regionserver.LocalIndexSplitter; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; @@ -159,8 +156,8 @@ import org.apache.phoenix.schema.PMetaData; import org.apache.phoenix.schema.PMetaDataImpl; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.PSynchronizedMetaData; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ReadOnlyTableException; @@ -181,7 +178,6 @@ import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.ConfigUtil; -import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.JDBCUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixContextExecutor; @@ -288,7 +284,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private PMetaData newEmptyMetaData() { long maxSizeBytes = props.getLong(QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE); - return new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, maxSizeBytes); + return new PSynchronizedMetaData(new PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, maxSizeBytes)); } /** @@ -554,7 +550,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } @Override - public PMetaData addTable(PTable table, long resolvedTime) throws SQLException { + public void addTable(PTable table, long resolvedTime) throws SQLException { synchronized (latestMetaDataLock) { try { throwConnectionClosedIfNullMetaData(); @@ -562,26 +558,25 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // If a client opens a connection at an earlier timestamp, this can happen PTable existingTable = latestMetaData.getTableRef(new PTableKey(table.getTenantId(), table.getName().getString())).getTable(); if (existingTable.getTimeStamp() >= table.getTimeStamp()) { - return latestMetaData; + return; } } catch (TableNotFoundException e) {} - latestMetaData = latestMetaData.addTable(table, resolvedTime); + latestMetaData.addTable(table, resolvedTime); latestMetaDataLock.notifyAll(); - return latestMetaData; } } - public PMetaData updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException { - synchronized (latestMetaDataLock) { + @Override + public void updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException { + synchronized (latestMetaDataLock) { throwConnectionClosedIfNullMetaData(); - latestMetaData = latestMetaData.updateResolvedTimestamp(table, resolvedTime); + latestMetaData.updateResolvedTimestamp(table, resolvedTime); latestMetaDataLock.notifyAll(); - return latestMetaData; } } private static interface Mutator { - PMetaData mutate(PMetaData metaData) throws SQLException; + void mutate(PMetaData metaData) throws SQLException; } /** @@ -604,7 +599,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement */ if (table.getSequenceNumber() + 1 == tableSeqNum) { // TODO: assert that timeStamp is bigger that table timeStamp? - metaData = mutator.mutate(metaData); + mutator.mutate(metaData); break; } else if (table.getSequenceNumber() >= tableSeqNum) { logger.warn("Attempt to cache older version of " + tableName + ": current= " + table.getSequenceNumber() + ", new=" + tableSeqNum); @@ -619,7 +614,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement logger.warn("Unable to update meta data repo within " + (DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS/1000) + " seconds for " + tableName); // There will never be a parentTableName here, as that would only // be non null for an index an we never add/remove columns from an index. - metaData = metaData.removeTable(tenantId, tableName, null, HConstants.LATEST_TIMESTAMP); + metaData.removeTable(tenantId, tableName, null, HConstants.LATEST_TIMESTAMP); break; } latestMetaDataLock.wait(waitTime); @@ -637,46 +632,43 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } @Override - public PMetaData addColumn(final PName tenantId, final String tableName, final List columns, + public void addColumn(final PName tenantId, final String tableName, final List columns, final long tableTimeStamp, final long tableSeqNum, final boolean isImmutableRows, final boolean isWalDisabled, final boolean isMultitenant, final boolean storeNulls, final boolean isTransactional, final long updateCacheFrequency, final boolean isNamespaceMapped, final long resolvedTime) throws SQLException { - return metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() { + metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() { @Override - public PMetaData mutate(PMetaData metaData) throws SQLException { + public void mutate(PMetaData metaData) throws SQLException { try { - return metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, + metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, updateCacheFrequency, isNamespaceMapped, resolvedTime); } catch (TableNotFoundException e) { // The DROP TABLE may have been processed first, so just ignore. - return metaData; } } }); } @Override - public PMetaData removeTable(PName tenantId, final String tableName, String parentTableName, long tableTimeStamp) throws SQLException { + public void removeTable(PName tenantId, final String tableName, String parentTableName, long tableTimeStamp) throws SQLException { synchronized (latestMetaDataLock) { throwConnectionClosedIfNullMetaData(); - latestMetaData = latestMetaData.removeTable(tenantId, tableName, parentTableName, tableTimeStamp); + latestMetaData.removeTable(tenantId, tableName, parentTableName, tableTimeStamp); latestMetaDataLock.notifyAll(); - return latestMetaData; } } @Override - public PMetaData removeColumn(final PName tenantId, final String tableName, final List columnsToRemove, final long tableTimeStamp, final long tableSeqNum, final long resolvedTime) throws SQLException { - return metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() { + public void removeColumn(final PName tenantId, final String tableName, final List columnsToRemove, final long tableTimeStamp, final long tableSeqNum, final long resolvedTime) throws SQLException { + metaDataMutated(tenantId, tableName, tableSeqNum, new Mutator() { @Override - public PMetaData mutate(PMetaData metaData) throws SQLException { + public void mutate(PMetaData metaData) throws SQLException { try { - return metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime); + metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime); } catch (TableNotFoundException e) { // The DROP TABLE may have been processed first, so just ignore. - return metaData; } } }); @@ -687,10 +679,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public PhoenixConnection connect(String url, Properties info) throws SQLException { checkClosed(); PMetaData metadata = latestMetaData; - if (metadata == null) { - throwConnectionClosedException(); - } - + throwConnectionClosedIfNullMetaData(); + metadata = metadata.clone(); return new PhoenixConnection(this, url, info, metadata); } @@ -1643,9 +1633,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement PTable table; try { PMetaData metadata = latestMetaData; - if (metadata == null) { - throwConnectionClosedException(); - } + throwConnectionClosedIfNullMetaData(); table = metadata.getTableRef(new PTableKey(tenantId, fullTableName)).getTable(); if (table.getTimeStamp() >= timestamp) { // Table in cache is newer than client timestamp which shouldn't be // the case @@ -2164,14 +2152,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement .build().buildException(); } } - + private HashSet existingColumnFamiliesForBaseTable(PName baseTableName) throws TableNotFoundException { - synchronized (latestMetaDataLock) { - throwConnectionClosedIfNullMetaData(); - PTable table = latestMetaData.getTableRef(new PTableKey(null, baseTableName.getString())).getTable(); - latestMetaDataLock.notifyAll(); - return existingColumnFamilies(table); - } + throwConnectionClosedIfNullMetaData(); + PTable table = latestMetaData.getTableRef(new PTableKey(null, baseTableName.getString())).getTable(); + return existingColumnFamilies(table); } private HashSet existingColumnFamilies(PTable table) { @@ -3362,7 +3347,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } @Override - public PMetaData addFunction(PFunction function) throws SQLException { + public void addFunction(PFunction function) throws SQLException { synchronized (latestMetaDataLock) { try { throwConnectionClosedIfNullMetaData(); @@ -3370,23 +3355,21 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // If a client opens a connection at an earlier timestamp, this can happen PFunction existingFunction = latestMetaData.getFunction(new PTableKey(function.getTenantId(), function.getFunctionName())); if (existingFunction.getTimeStamp() >= function.getTimeStamp()) { - return latestMetaData; + return; } } catch (FunctionNotFoundException e) {} - latestMetaData = latestMetaData.addFunction(function); + latestMetaData.addFunction(function); latestMetaDataLock.notifyAll(); - return latestMetaData; } } @Override - public PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp) + public void removeFunction(PName tenantId, String function, long functionTimeStamp) throws SQLException { synchronized (latestMetaDataLock) { throwConnectionClosedIfNullMetaData(); - latestMetaData = latestMetaData.removeFunction(tenantId, function, functionTimeStamp); + latestMetaData.removeFunction(tenantId, function, functionTimeStamp); latestMetaDataLock.notifyAll(); - return latestMetaData; } } @@ -3642,13 +3625,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } @Override - public PMetaData addSchema(PSchema schema) throws SQLException { - return latestMetaData = latestMetaData.addSchema(schema); + public void addSchema(PSchema schema) throws SQLException { + latestMetaData.addSchema(schema); } @Override - public PMetaData removeSchema(PSchema schema, long schemaTimeStamp) { - return latestMetaData = latestMetaData.removeSchema(schema, schemaTimeStamp); + public void removeSchema(PSchema schema, long schemaTimeStamp) { + latestMetaData.removeSchema(schema, schemaTimeStamp); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/c22020a4/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index f373de2..25aca74 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -24,8 +24,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Map.Entry; +import java.util.Objects; import java.util.Properties; import java.util.Set; @@ -86,14 +86,13 @@ import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.SequenceUtil; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.inmemory.InMemoryTxSystemClient; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + /** * @@ -172,41 +171,41 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple } @Override - public PMetaData addTable(PTable table, long resolvedTime) throws SQLException { - return metaData = metaData.addTable(table, resolvedTime); + public void addTable(PTable table, long resolvedTime) throws SQLException { + metaData.addTable(table, resolvedTime); } @Override - public PMetaData updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException { - return metaData = metaData.updateResolvedTimestamp(table, resolvedTimestamp); + public void updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException { + metaData.updateResolvedTimestamp(table, resolvedTimestamp); } @Override - public PMetaData addColumn(PName tenantId, String tableName, List columns, long tableTimeStamp, + public void addColumn(PName tenantId, String tableName, List columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long updateCacheFrequency, boolean isNamespaceMapped, long resolvedTime) throws SQLException { - return metaData = metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, + metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, updateCacheFrequency, isNamespaceMapped, resolvedTime); } @Override - public PMetaData removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) + public void removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) throws SQLException { - return metaData = metaData.removeTable(tenantId, tableName, parentTableName, tableTimeStamp); + metaData.removeTable(tenantId, tableName, parentTableName, tableTimeStamp); } @Override - public PMetaData removeColumn(PName tenantId, String tableName, List columnsToRemove, long tableTimeStamp, + public void removeColumn(PName tenantId, String tableName, List columnsToRemove, long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException { - return metaData = metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime); + metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime); } @Override public PhoenixConnection connect(String url, Properties info) throws SQLException { - return new PhoenixConnection(this, url, info, metaData); + return new PhoenixConnection(this, url, info, metaData.clone()); } @Override @@ -549,14 +548,14 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple } @Override - public PMetaData addFunction(PFunction function) throws SQLException { - return metaData = this.metaData.addFunction(function); + public void addFunction(PFunction function) throws SQLException { + this.metaData.addFunction(function); } @Override - public PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp) + public void removeFunction(PName tenantId, String function, long functionTimeStamp) throws SQLException { - return metaData = this.metaData.removeFunction(tenantId, function, functionTimeStamp); + this.metaData.removeFunction(tenantId, function, functionTimeStamp); } @Override @@ -615,8 +614,8 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple } @Override - public PMetaData addSchema(PSchema schema) throws SQLException { - return metaData = this.metaData.addSchema(schema); + public void addSchema(PSchema schema) throws SQLException { + this.metaData.addSchema(schema); } @Override @@ -629,8 +628,8 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple } @Override - public PMetaData removeSchema(PSchema schema, long schemaTimeStamp) { - return metaData = metaData.removeSchema(schema, schemaTimeStamp); + public void removeSchema(PSchema schema, long schemaTimeStamp) { + metaData.removeSchema(schema, schemaTimeStamp); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/c22020a4/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 953c73d..99ad59c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -39,7 +39,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.PMetaData; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableType; @@ -47,7 +46,6 @@ import org.apache.phoenix.schema.Sequence; import org.apache.phoenix.schema.SequenceAllocation; import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.stats.PTableStats; - import org.apache.tephra.TransactionSystemClient; @@ -78,35 +76,35 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple } @Override - public PMetaData addTable(PTable table, long resolvedTime) throws SQLException { - return getDelegate().addTable(table, resolvedTime); + public void addTable(PTable table, long resolvedTime) throws SQLException { + getDelegate().addTable(table, resolvedTime); } @Override - public PMetaData updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException { - return getDelegate().updateResolvedTimestamp(table, resolvedTimestamp); + public void updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException { + getDelegate().updateResolvedTimestamp(table, resolvedTimestamp); } @Override - public PMetaData addColumn(PName tenantId, String tableName, List columns, long tableTimeStamp, + public void addColumn(PName tenantId, String tableName, List columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long updateCacheFrequency, boolean isNamespaceMapped, long resolvedTime) throws SQLException { - return getDelegate().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, + getDelegate().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, updateCacheFrequency, isNamespaceMapped, resolvedTime); } @Override - public PMetaData removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) + public void removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) throws SQLException { - return getDelegate().removeTable(tenantId, tableName, parentTableName, tableTimeStamp); + getDelegate().removeTable(tenantId, tableName, parentTableName, tableTimeStamp); } @Override - public PMetaData removeColumn(PName tenantId, String tableName, List columnsToRemove, long tableTimeStamp, + public void removeColumn(PName tenantId, String tableName, List columnsToRemove, long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException { - return getDelegate().removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime); + getDelegate().removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime); } @Override @@ -279,14 +277,14 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple } @Override - public PMetaData addFunction(PFunction function) throws SQLException { - return getDelegate().addFunction(function); + public void addFunction(PFunction function) throws SQLException { + getDelegate().addFunction(function); } @Override - public PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp) + public void removeFunction(PName tenantId, String function, long functionTimeStamp) throws SQLException { - return getDelegate().removeFunction(tenantId, function, functionTimeStamp); + getDelegate().removeFunction(tenantId, function, functionTimeStamp); } @Override @@ -319,8 +317,8 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple } @Override - public PMetaData addSchema(PSchema schema) throws SQLException { - return getDelegate().addSchema(schema); + public void addSchema(PSchema schema) throws SQLException { + getDelegate().addSchema(schema); } @Override @@ -334,8 +332,8 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple } @Override - public PMetaData removeSchema(PSchema schema, long schemaTimeStamp) { - return getDelegate().removeSchema(schema, schemaTimeStamp); + public void removeSchema(PSchema schema, long schemaTimeStamp) { + getDelegate().removeSchema(schema, schemaTimeStamp); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/c22020a4/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java b/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java index f532dc8..0b6a644 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/MetaDataMutated.java @@ -23,7 +23,6 @@ import java.util.List; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.PMetaData; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; @@ -36,13 +35,13 @@ import org.apache.phoenix.schema.PTable; * @since 0.1 */ public interface MetaDataMutated { - PMetaData addTable(PTable table, long resolvedTime) throws SQLException; - PMetaData updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException; - PMetaData removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) throws SQLException; - PMetaData addColumn(PName tenantId, String tableName, List columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long updateCacheFrequency, boolean isNamespaceMapped, long resolvedTime) throws SQLException; - PMetaData removeColumn(PName tenantId, String tableName, List columnsToRemove, long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException; - PMetaData addFunction(PFunction function) throws SQLException; - PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp) throws SQLException; - PMetaData addSchema(PSchema schema) throws SQLException; - PMetaData removeSchema(PSchema schema, long schemaTimeStamp); + void addTable(PTable table, long resolvedTime) throws SQLException; + void updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException; + void removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) throws SQLException; + void addColumn(PName tenantId, String tableName, List columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long updateCacheFrequency, boolean isNamespaceMapped, long resolvedTime) throws SQLException; + void removeColumn(PName tenantId, String tableName, List columnsToRemove, long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException; + void addFunction(PFunction function) throws SQLException; + void removeFunction(PName tenantId, String function, long functionTimeStamp) throws SQLException; + void addSchema(PSchema schema) throws SQLException; + void removeSchema(PSchema schema, long schemaTimeStamp); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c22020a4/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java index 6a710eb..cfeb13f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaData.java @@ -30,9 +30,9 @@ public interface PMetaData extends MetaDataMutated, Iterable, Cloneable public int size(); public PMetaData clone(); public PTableRef getTableRef(PTableKey key) throws TableNotFoundException; - public PMetaData pruneTables(Pruner pruner); + public void pruneTables(Pruner pruner); public PFunction getFunction(PTableKey key) throws FunctionNotFoundException; - public PMetaData pruneFunctions(Pruner pruner); + public void pruneFunctions(Pruner pruner); public long getAge(PTableRef ref); public PSchema getSchema(PTableKey key) throws SchemaNotFoundException; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c22020a4/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java index 67a2714..5ffacca 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java @@ -28,7 +28,10 @@ import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TimeKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; @@ -43,7 +46,8 @@ import com.google.common.primitives.Longs; * */ public class PMetaDataImpl implements PMetaData { - private static class PMetaDataCache implements Cloneable { + private static final Logger logger = LoggerFactory.getLogger(PMetaDataImpl.class); + static class PMetaDataCache implements Cloneable { private static final int MIN_REMOVAL_SIZE = 3; private static final Comparator COMPARATOR = new Comparator() { @Override @@ -239,7 +243,12 @@ public class PMetaDataImpl implements PMetaData { } } - private final PMetaDataCache metaData; + private PMetaDataCache metaData; + + @VisibleForTesting + public PMetaDataCache getMetaData() { + return metaData; + } public PMetaDataImpl(int initialCapacity, long maxByteSize) { this.metaData = new PMetaDataCache(initialCapacity, maxByteSize, TimeKeeper.SYSTEM); @@ -250,12 +259,12 @@ public class PMetaDataImpl implements PMetaData { } private PMetaDataImpl(PMetaDataCache metaData) { - this.metaData = metaData.clone(); + this.metaData = metaData; } @Override public PMetaDataImpl clone() { - return new PMetaDataImpl(this.metaData); + return new PMetaDataImpl(new PMetaDataCache(this.metaData)); } @Override @@ -282,14 +291,12 @@ public class PMetaDataImpl implements PMetaData { } @Override - public PMetaData updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException { - PMetaDataCache clone = metaData.clone(); - clone.putDuplicate(table.getKey(), table, resolvedTimestamp); - return new PMetaDataImpl(clone); + public void updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException { + metaData.putDuplicate(table.getKey(), table, resolvedTimestamp); } @Override - public PMetaData addTable(PTable table, long resolvedTime) throws SQLException { + public void addTable(PTable table, long resolvedTime) throws SQLException { int netGain = 0; PTableKey key = table.getKey(); PTableRef oldTableRef = metaData.get(key); @@ -323,28 +330,27 @@ public class PMetaDataImpl implements PMetaData { netGain += table.getEstimatedSize(); } long overage = metaData.getCurrentSize() + netGain - metaData.getMaxSize(); - PMetaDataCache newMetaData = overage <= 0 ? metaData.clone() : metaData.cloneMinusOverage(overage); + metaData = overage <= 0 ? metaData : metaData.cloneMinusOverage(overage); if (newParentTable != null) { // Upsert new index table into parent data table list - newMetaData.put(newParentTable.getKey(), newParentTable, parentResolvedTimestamp); - newMetaData.putDuplicate(table.getKey(), table, resolvedTime); + metaData.put(newParentTable.getKey(), newParentTable, parentResolvedTimestamp); + metaData.putDuplicate(table.getKey(), table, resolvedTime); } else { - newMetaData.put(table.getKey(), table, resolvedTime); + metaData.put(table.getKey(), table, resolvedTime); } for (PTable index : table.getIndexes()) { - newMetaData.putDuplicate(index.getKey(), index, resolvedTime); + metaData.putDuplicate(index.getKey(), index, resolvedTime); } - return new PMetaDataImpl(newMetaData); } @Override - public PMetaData addColumn(PName tenantId, String tableName, List columnsToAdd, long tableTimeStamp, + public void addColumn(PName tenantId, String tableName, List columnsToAdd, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, boolean isTransactional, long updateCacheFrequency, boolean isNamespaceMapped, long resolvedTime) throws SQLException { PTableRef oldTableRef = metaData.get(new PTableKey(tenantId, tableName)); if (oldTableRef == null) { - return this; + return; } List oldColumns = PTableImpl.getColumnsToClone(oldTableRef.getTable()); List newColumns; @@ -358,12 +364,11 @@ public class PMetaDataImpl implements PMetaData { PTable newTable = PTableImpl.makePTable(oldTableRef.getTable(), tableTimeStamp, tableSeqNum, newColumns, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, updateCacheFrequency, isNamespaceMapped); - return addTable(newTable, resolvedTime); + addTable(newTable, resolvedTime); } @Override - public PMetaData removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) throws SQLException { - PMetaDataCache tables = null; + public void removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) throws SQLException { PTableRef parentTableRef = null; PTableKey key = new PTableKey(tenantId, tableName); if (metaData.get(key) == null) { @@ -371,16 +376,15 @@ public class PMetaDataImpl implements PMetaData { parentTableRef = metaData.get(new PTableKey(tenantId, parentTableName)); } if (parentTableRef == null) { - return this; + return; } } else { - tables = metaData.clone(); - PTable table = tables.remove(key); + PTable table = metaData.remove(key); for (PTable index : table.getIndexes()) { - tables.remove(index.getKey()); + metaData.remove(index.getKey()); } if (table.getParentName() != null) { - parentTableRef = tables.get(new PTableKey(tenantId, table.getParentName().getString())); + parentTableRef = metaData.get(new PTableKey(tenantId, table.getParentName().getString())); } } // also remove its reference from parent table @@ -397,26 +401,22 @@ public class PMetaDataImpl implements PMetaData { parentTableRef.getTable(), tableTimeStamp == HConstants.LATEST_TIMESTAMP ? parentTableRef.getTable().getTimeStamp() : tableTimeStamp, newIndexes); - if (tables == null) { - tables = metaData.clone(); - } - tables.put(parentTable.getKey(), parentTable, parentTableRef.getResolvedTimeStamp()); + metaData.put(parentTable.getKey(), parentTable, parentTableRef.getResolvedTimeStamp()); break; } } } } - return tables == null ? this : new PMetaDataImpl(tables); } @Override - public PMetaData removeColumn(PName tenantId, String tableName, List columnsToRemove, long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException { + public void removeColumn(PName tenantId, String tableName, List columnsToRemove, long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException { PTableRef tableRef = metaData.get(new PTableKey(tenantId, tableName)); if (tableRef == null) { - return this; + return; } PTable table = tableRef.getTable(); - PMetaDataCache tables = metaData.clone(); + PMetaDataCache tables = metaData; for (PColumn columnToRemove : columnsToRemove) { PColumn column; String familyName = columnToRemove.getFamilyName().getString(); @@ -445,25 +445,21 @@ public class PMetaDataImpl implements PMetaData { table = PTableImpl.makePTable(table, tableTimeStamp, tableSeqNum, columns); } tables.put(table.getKey(), table, resolvedTime); - return new PMetaDataImpl(tables); } @Override - public PMetaData pruneTables(Pruner pruner) { + public void pruneTables(Pruner pruner) { List keysToPrune = Lists.newArrayListWithExpectedSize(this.size()); for (PTable table : this) { if (pruner.prune(table)) { keysToPrune.add(table.getKey()); } } - if (keysToPrune.isEmpty()) { - return this; - } - PMetaDataCache tables = metaData.clone(); - for (PTableKey key : keysToPrune) { - tables.remove(key); + if (!keysToPrune.isEmpty()) { + for (PTableKey key : keysToPrune) { + metaData.remove(key); + } } - return new PMetaDataImpl(tables); } @Override @@ -472,35 +468,29 @@ public class PMetaDataImpl implements PMetaData { } @Override - public PMetaData addFunction(PFunction function) throws SQLException { + public void addFunction(PFunction function) throws SQLException { this.metaData.functions.put(function.getKey(), function); - return this; } @Override - public PMetaData removeFunction(PName tenantId, String function, long functionTimeStamp) + public void removeFunction(PName tenantId, String function, long functionTimeStamp) throws SQLException { this.metaData.functions.remove(new PTableKey(tenantId, function)); - return this; } @Override - public PMetaData pruneFunctions(Pruner pruner) { + public void pruneFunctions(Pruner pruner) { List keysToPrune = Lists.newArrayListWithExpectedSize(this.size()); for (PFunction function : this.metaData.functions.values()) { if (pruner.prune(function)) { keysToPrune.add(function.getKey()); } } - if (keysToPrune.isEmpty()) { - return this; - } - PMetaDataCache clone = metaData.clone(); - for (PTableKey key : keysToPrune) { - clone.functions.remove(key); + if (!keysToPrune.isEmpty()) { + for (PTableKey key : keysToPrune) { + metaData.functions.remove(key); + } } - return new PMetaDataImpl(clone); - } @Override @@ -509,9 +499,8 @@ public class PMetaDataImpl implements PMetaData { } @Override - public PMetaData addSchema(PSchema schema) throws SQLException { + public void addSchema(PSchema schema) throws SQLException { this.metaData.schemas.put(schema.getSchemaKey(), schema); - return this; } @Override @@ -522,8 +511,8 @@ public class PMetaDataImpl implements PMetaData { } @Override - public PMetaData removeSchema(PSchema schema, long schemaTimeStamp) { + public void removeSchema(PSchema schema, long schemaTimeStamp) { this.metaData.schemas.remove(SchemaUtil.getSchemaKey(schema.getSchemaName())); - return this; } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c22020a4/phoenix-core/src/main/java/org/apache/phoenix/schema/PSynchronizedMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PSynchronizedMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PSynchronizedMetaData.java new file mode 100644 index 0000000..af4bc60 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PSynchronizedMetaData.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema; + +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.annotation.concurrent.GuardedBy; + +import org.apache.phoenix.parse.PFunction; +import org.apache.phoenix.parse.PSchema; + +public class PSynchronizedMetaData implements PMetaData { + + @GuardedBy("readWriteLock") + private PMetaData delegate; + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + + public PSynchronizedMetaData(PMetaData metadata) { + this.delegate = metadata; + } + + @Override + public Iterator iterator() { + readWriteLock.readLock().lock(); + try { + return delegate.iterator(); + } + finally { + readWriteLock.readLock().unlock(); + } + } + + @Override + public int size() { + readWriteLock.readLock().lock(); + try { + return delegate.size(); + } + finally { + readWriteLock.readLock().unlock(); + } + } + + @Override + public PMetaData clone() { + readWriteLock.readLock().lock(); + try { + return delegate.clone(); + } + finally { + readWriteLock.readLock().unlock(); + } + } + + @Override + public void addTable(PTable table, long resolvedTime) throws SQLException { + readWriteLock.writeLock().lock(); + try { + delegate.addTable(table, resolvedTime); + } + finally { + readWriteLock.writeLock().unlock(); + } + } + + @Override + public PTableRef getTableRef(PTableKey key) throws TableNotFoundException { + readWriteLock.readLock().lock(); + try { + return delegate.getTableRef(key); + } + finally { + readWriteLock.readLock().unlock(); + } + } + + @Override + public void updateResolvedTimestamp(PTable table, long resolvedTimestamp) throws SQLException { + readWriteLock.writeLock().lock(); + try { + delegate.updateResolvedTimestamp(table, resolvedTimestamp); + } + finally { + readWriteLock.writeLock().unlock(); + } + } + + @Override + public void pruneTables(Pruner pruner) { + readWriteLock.writeLock().lock(); + try { + delegate.pruneTables(pruner); + } + finally { + readWriteLock.writeLock().unlock(); + } + } + + @Override + public PFunction getFunction(PTableKey key) throws FunctionNotFoundException { + readWriteLock.readLock().lock(); + try { + return delegate.getFunction(key); + } + finally { + readWriteLock.readLock().unlock(); + } + } + + @Override + public void removeTable(PName tenantId, String tableName, String parentTableName, + long tableTimeStamp) throws SQLException { + readWriteLock.writeLock().lock(); + try { + delegate.removeTable(tenantId, tableName, parentTableName, tableTimeStamp); + } + finally { + readWriteLock.writeLock().unlock(); + } + } + + @Override + public void pruneFunctions(Pruner pruner) { + readWriteLock.writeLock().lock(); + try { + delegate.pruneFunctions(pruner); + } + finally { + readWriteLock.writeLock().unlock(); + } + } + + @Override + public long getAge(PTableRef ref) { + readWriteLock.readLock().lock(); + try { + return delegate.getAge(ref); + } + finally { + readWriteLock.readLock().unlock(); + } + } + + @Override + public void addColumn(PName tenantId, String tableName, List columns, + long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, + boolean isMultitenant, boolean storeNulls, boolean isTransactional, + long updateCacheFrequency, boolean isNamespaceMapped, long resolvedTime) + throws SQLException { + readWriteLock.writeLock().lock(); + try { + delegate.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, + isImmutableRows, isWalDisabled, isMultitenant, storeNulls, isTransactional, + updateCacheFrequency, isNamespaceMapped, resolvedTime); + } + finally { + readWriteLock.writeLock().unlock(); + } + } + + @Override + public PSchema getSchema(PTableKey key) throws SchemaNotFoundException { + readWriteLock.readLock().lock(); + try { + return delegate.getSchema(key); + } + finally { + readWriteLock.readLock().unlock(); + } + } + + @Override + public void removeColumn(PName tenantId, String tableName, List columnsToRemove, + long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException { + readWriteLock.writeLock().lock(); + try { + delegate.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, + resolvedTime); + } + finally { + readWriteLock.writeLock().unlock(); + } + } + + @Override + public void addFunction(PFunction function) throws SQLException { + readWriteLock.writeLock().lock(); + try { + delegate.addFunction(function); + } + finally { + readWriteLock.writeLock().unlock(); + } + } + + @Override + public void removeFunction(PName tenantId, String function, long functionTimeStamp) + throws SQLException { + readWriteLock.writeLock().lock(); + try { + delegate.removeFunction(tenantId, function, functionTimeStamp); + } + finally { + readWriteLock.writeLock().unlock(); + } + } + + @Override + public void addSchema(PSchema schema) throws SQLException { + readWriteLock.writeLock().lock(); + try { + delegate.addSchema(schema); + } + finally { + readWriteLock.writeLock().unlock(); + } + } + + @Override + public void removeSchema(PSchema schema, long schemaTimeStamp) { + readWriteLock.writeLock().lock(); + try { + delegate.removeSchema(schema, schemaTimeStamp); + } + finally { + readWriteLock.writeLock().unlock(); + } + } + +}