From commits-return-22940-archive-asf-public=cust-asf.ponee.io@phoenix.apache.org Wed Sep 26 20:20:35 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D94E9180629 for ; Wed, 26 Sep 2018 20:20:34 +0200 (CEST) Received: (qmail 90511 invoked by uid 500); 26 Sep 2018 18:20:34 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 90502 invoked by uid 99); 26 Sep 2018 18:20:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Sep 2018 18:20:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D5ED3DFEB8; Wed, 26 Sep 2018 18:20:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: larsh@apache.org To: commits@phoenix.apache.org Message-Id: <143851e4481948328f9860337bd1efb9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: phoenix git commit: PHOENIX-4849 Phoenix may generate incorrectly replace TableResultIterators after HBase region splits. Date: Wed, 26 Sep 2018 18:20:33 +0000 (UTC) Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.2 1abca93eb -> bd5aa2d9c PHOENIX-4849 Phoenix may generate incorrectly replace TableResultIterators after HBase region splits. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bd5aa2d9 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bd5aa2d9 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bd5aa2d9 Branch: refs/heads/4.x-HBase-1.2 Commit: bd5aa2d9c40c099bbe37eac7f1a38b96fd843cfc Parents: 1abca93 Author: Lars Hofhansl Authored: Wed Sep 26 11:20:25 2018 -0700 Committer: Lars Hofhansl Committed: Wed Sep 26 11:20:25 2018 -0700 ---------------------------------------------------------------------- .../end2end/UpsertSelectAutoCommitIT.java | 22 ++++++--- .../apache/phoenix/compile/UpsertCompiler.java | 17 +++++++ .../phoenix/coprocessor/ScanRegionObserver.java | 9 +++- .../phoenix/iterate/TableResultIterator.java | 48 +++++++------------- .../java/org/apache/phoenix/util/ScanUtil.java | 8 ++++ 5 files changed, 63 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/bd5aa2d9/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java index 3966f15..6210852 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java @@ -34,10 +34,13 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Properties; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.TestUtil; import org.junit.Test; @@ -160,19 +163,24 @@ public class UpsertSelectAutoCommitIT extends ParallelStatsDisabledIT { props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3)); Connection conn = DriverManager.getConnection(getUrl(), props); conn.setAutoCommit(true); - conn.createStatement().execute("CREATE SEQUENCE keys"); + conn.createStatement().execute("CREATE SEQUENCE keys CACHE 1000"); String tableName = generateUniqueName(); - conn.createStatement().execute( - "CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, val INTEGER)"); + conn.createStatement().execute("CREATE TABLE " + tableName + + " (pk INTEGER PRIMARY KEY, val INTEGER) UPDATE_CACHE_FREQUENCY=3600000"); conn.createStatement().execute( "UPSERT INTO " + tableName + " VALUES (NEXT VALUE FOR keys,1)"); - for (int i=0; i<6; i++) { - Statement stmt = conn.createStatement(); - int upsertCount = stmt.executeUpdate( - "UPSERT INTO " + tableName + " SELECT NEXT VALUE FOR keys, val FROM " + tableName); + PreparedStatement stmt = + conn.prepareStatement("UPSERT INTO " + tableName + + " SELECT NEXT VALUE FOR keys, val FROM " + tableName); + HBaseAdmin admin = + driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin(); + for (int i=0; i<12; i++) { + admin.split(TableName.valueOf(tableName)); + int upsertCount = stmt.executeUpdate(); assertEquals((int)Math.pow(2, i), upsertCount); } + admin.close(); conn.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bd5aa2d9/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 6d81f8b..4f08846 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -76,12 +76,14 @@ import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.parse.SequenceValueParseNode; import org.apache.phoenix.parse.UpsertStatement; import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.ConstraintViolationException; import org.apache.phoenix.schema.DelegateColumn; import org.apache.phoenix.schema.IllegalDataException; +import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; import org.apache.phoenix.schema.PName; @@ -557,6 +559,21 @@ public class UpsertCompiler { // Use optimizer to choose the best plan QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), true, false, null); queryPlanToBe = compiler.compile(); + + if (sameTable) { + // in the UPSERT INTO X ... SELECT FROM X case enforce the source tableRef's TS + // as max TS, so that the query can safely restarted and still work of a snapshot + // (so it won't see its own data in case of concurrent splits) + // see PHOENIX-4849 + long serverTime = selectResolver.getTables().get(0).getCurrentTime(); + if (serverTime == QueryConstants.UNSET_TIMESTAMP) { + // if this is the first time this table is resolved the ref's current time might not be defined, yet + // in that case force an RPC to get the server time + serverTime = new MetaDataClient(connection).getCurrentTime(schemaName, tableName); + } + Scan scan = queryPlanToBe.getContext().getScan(); + ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), serverTime); + } // This is post-fix: if the tableRef is a projected table, this means there are post-processing // steps and parallelIteratorFactory did not take effect. if (queryPlanToBe.getTableRef().getTable().getType() == PTableType.PROJECTED || queryPlanToBe.getTableRef().getTable().getType() == PTableType.SUBQUERY) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/bd5aa2d9/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index 2d9cd4f..c2dfc4c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -32,6 +32,7 @@ import org.apache.phoenix.expression.OrderByExpression; import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.EncodedColumnsUtil; +import org.apache.phoenix.util.ScanUtil; /** * @@ -75,8 +76,12 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { } @Override - protected boolean isRegionObserverFor(Scan scan) { - return scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null; + protected boolean skipRegionBoundaryCheck(Scan scan) { + return super.skipRegionBoundaryCheck(scan) || ScanUtil.isSimpleScan(scan); } + @Override + protected boolean isRegionObserverFor(Scan scan) { + return ScanUtil.isNonAggregateScan(scan); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bd5aa2d9/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index f1d1663..e6b94fb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -170,7 +170,7 @@ public class TableResultIterator implements ResultIterator { } catch (SQLException e) { try { throw ServerUtil.parseServerException(e); - } catch(StaleRegionBoundaryCacheException | HashJoinCacheNotFoundException e1) { + } catch(HashJoinCacheNotFoundException e1) { if(ScanUtil.isNonAggregateScan(scan) && plan.getContext().getAggregationManager().isEmpty()) { // For non aggregate queries if we get stale region boundary exception we can // continue scanning from the next value of lasted fetched result. @@ -188,40 +188,24 @@ public class TableResultIterator implements ResultIterator { } } plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName()); - if (e1 instanceof HashJoinCacheNotFoundException) { - logger.debug( - "Retrying when Hash Join cache is not found on the server ,by sending the cache again"); - if (retry <= 0) { + logger.debug( + "Retrying when Hash Join cache is not found on the server ,by sending the cache again"); + if (retry <= 0) { + throw e1; + } + Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId(); + retry--; + try { + ServerCache cache = caches == null ? null : + caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))); + if (!hashCacheClient.addHashCacheToServer(newScan.getStartRow(), + cache, plan.getTableRef().getTable())) { throw e1; } - Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId(); - retry--; - try { - ServerCache cache = caches == null ? null : - caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))); - if (!hashCacheClient.addHashCacheToServer(newScan.getStartRow(), - cache, plan.getTableRef().getTable())) { - throw e1; - } - this.scanIterator = ((BaseQueryPlan) plan).iterator(caches, scanGrouper, newScan); + this.scanIterator = ((BaseQueryPlan) plan).iterator(caches, scanGrouper, newScan); - } catch (Exception ex) { - throw ServerUtil.parseServerException(ex); - } - } else { - try { - if(plan.getContext().isClientSideUpsertSelect()) { - if(ScanUtil.isLocalIndex(newScan)) { - throw e; - } - this.scanIterator = - new ScanningResultIterator(htable.getScanner(newScan), newScan, scanMetricsHolder); - } else { - this.scanIterator = plan.iterator(scanGrouper, newScan); - } - } catch (IOException ex) { - throw ServerUtil.parseServerException(ex); - } + } catch (Exception ex) { + throw ServerUtil.parseServerException(ex); } lastTuple = scanIterator.next(); } else { http://git-wip-us.apache.org/repos/asf/phoenix/blob/bd5aa2d9/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index 62ecebd..2ac08e8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -119,6 +119,14 @@ public class ScanUtil { return scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null; } + // Designates a "simple scan", i.e. a scan that does not need to be scoped + // to a single region. + public static boolean isSimpleScan(Scan scan) { + return ScanUtil.isNonAggregateScan(scan) && + scan.getAttribute(BaseScannerRegionObserver.TOPN) == null && + scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET) == null; + } + // Use getTenantId and pass in column name to match against // in as PSchema attribute. If column name matches in // KeyExpressions, set on scan as attribute