Return-Path: X-Original-To: apmail-phoenix-commits-archive@minotaur.apache.org Delivered-To: apmail-phoenix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 52F4DF2EB for ; Thu, 13 Nov 2014 05:55:27 +0000 (UTC) Received: (qmail 29749 invoked by uid 500); 13 Nov 2014 05:55:27 -0000 Delivered-To: apmail-phoenix-commits-archive@phoenix.apache.org Received: (qmail 29665 invoked by uid 500); 13 Nov 2014 05:55:27 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 29584 invoked by uid 99); 13 Nov 2014 05:55:27 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Nov 2014 05:55:27 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D242AA1332B; Thu, 13 Nov 2014 05:55:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jamestaylor@apache.org To: commits@phoenix.apache.org Date: Thu, 13 Nov 2014 05:55:28 -0000 Message-Id: <9888f54214dc4516a19db176163a65bc@git.apache.org> In-Reply-To: <45399660b7f84374a3ffa875c61316b3@git.apache.org> References: <45399660b7f84374a3ffa875c61316b3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/9] phoenix git commit: PHOENIX-1432 Run limit query that has only leading PK column filter serially PHOENIX-1432 Run limit query that has only leading PK column filter serially Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1c1f18f2 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1c1f18f2 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1c1f18f2 Branch: refs/heads/master Commit: 1c1f18f28eee8d0603be39813ca1aad433f0dd5a Parents: b0c1871 Author: James Taylor Authored: Wed Nov 12 10:02:20 2014 -0800 Committer: James Taylor Committed: Wed Nov 12 21:50:04 2014 -0800 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/HashJoinIT.java | 6 +- .../org/apache/phoenix/end2end/KeyOnlyIT.java | 49 +- .../phoenix/end2end/StatsCollectorIT.java | 16 +- .../MutatingParallelIteratorFactory.java | 2 +- .../apache/phoenix/compile/QueryCompiler.java | 2 +- .../coprocessor/MetaDataEndpointImpl.java | 4 +- .../apache/phoenix/execute/AggregatePlan.java | 2 +- .../apache/phoenix/execute/BaseQueryPlan.java | 2 +- .../org/apache/phoenix/execute/ScanPlan.java | 47 +- .../phoenix/iterate/BaseResultIterators.java | 622 +++++++++++++++++++ .../phoenix/iterate/ChunkedResultIterator.java | 10 +- .../phoenix/iterate/ConcatResultIterator.java | 37 +- .../phoenix/iterate/DelegateResultIterator.java | 4 + .../iterate/LimitingPeekingResultIterator.java | 47 ++ .../iterate/ParallelIteratorFactory.java | 27 + .../phoenix/iterate/ParallelIterators.java | 580 +---------------- .../apache/phoenix/iterate/ResultIterators.java | 7 +- .../apache/phoenix/iterate/SerialIterators.java | 115 ++++ .../phoenix/iterate/SpoolingResultIterator.java | 1 - .../apache/phoenix/optimize/QueryOptimizer.java | 2 +- .../phoenix/query/QueryServicesOptions.java | 1 + .../schema/stats/StatisticsCollector.java | 23 +- .../phoenix/schema/stats/StatisticsUtil.java | 17 + .../phoenix/schema/stats/StatisticsWriter.java | 30 +- .../org/apache/phoenix/util/SchemaUtil.java | 24 + .../org/apache/phoenix/util/ServerUtil.java | 34 +- .../iterate/AggregateResultScannerTest.java | 11 + .../iterate/ConcatResultIteratorTest.java | 21 +- .../iterate/MergeSortResultIteratorTest.java | 12 +- .../org/apache/phoenix/query/QueryPlanTest.java | 51 ++ .../phoenix/pig/hadoop/PhoenixRecordReader.java | 2 +- 31 files changed, 1145 insertions(+), 663 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java index 5190c18..ec22360 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java @@ -394,7 +394,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT { * LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id * LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4 */ - "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" + + "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" + " SERVER FILTER BY PageFilter 4\n" + " SERVER 4 ROW LIMIT\n" + "CLIENT 4 ROW LIMIT\n" + @@ -744,7 +744,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT { * LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id * LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4 */ - "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" + + "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" + " SERVER FILTER BY PageFilter 4\n" + " SERVER 4 ROW LIMIT\n" + "CLIENT 4 ROW LIMIT\n" + @@ -1115,7 +1115,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT { * LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id * LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 4 */ - "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" + + "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" + " SERVER FILTER BY PageFilter 4\n" + " SERVER 4 ROW LIMIT\n" + "CLIENT 4 ROW LIMIT\n" + http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java index b7e3314..42ee5ad 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java @@ -37,7 +37,9 @@ import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -51,7 +53,8 @@ public class KeyOnlyIT extends BaseOwnClusterClientManagedTimeIT { public static void doSetup() throws Exception { Map props = Maps.newHashMapWithExpectedSize(3); // Must update config before starting server - props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20)); + props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(50)); + props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(100)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } @@ -79,7 +82,7 @@ public class KeyOnlyIT extends BaseOwnClusterClientManagedTimeIT { assertEquals(4, rs.getInt(2)); assertFalse(rs.next()); List splits = getAllSplits(conn5, "KEYONLY"); - assertEquals(3, splits.size()); + assertEquals(2, splits.size()); conn5.close(); props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+60)); @@ -159,6 +162,31 @@ public class KeyOnlyIT extends BaseOwnClusterClientManagedTimeIT { conn5.close(); } + @Test + public void testQueryWithLimitAndStats() throws Exception { + long ts = nextTimestamp(); + ensureTableCreated(getUrl(),KEYONLY_NAME,null, ts); + initTableValues(ts+1, 100); + + TestUtil.analyzeTable(getUrl(), ts+10, KEYONLY_NAME); + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+50)); + Connection conn = DriverManager.getConnection(getUrl(), props); + String query = "SELECT i1 FROM KEYONLY LIMIT 1"; + ResultSet rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + assertEquals("CLIENT SERIAL 1-WAY FULL SCAN OVER KEYONLY\n" + + " SERVER FILTER BY PageFilter 1\n" + + " SERVER 1 ROW LIMIT\n" + + "CLIENT 1 ROW LIMIT", QueryUtil.getExplainPlan(rs)); + conn.close(); + } + protected static void initTableValues(long ts) throws Exception { String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -177,4 +205,21 @@ public class KeyOnlyIT extends BaseOwnClusterClientManagedTimeIT { conn.commit(); conn.close(); } + + protected static void initTableValues(long ts, int nRows) throws Exception { + String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(url, props); + PreparedStatement stmt = conn.prepareStatement( + "upsert into " + + "KEYONLY VALUES (?, ?)"); + for (int i = 0; i < nRows; i++) { + stmt.setInt(1, i); + stmt.setInt(2, i+1); + stmt.execute(); + } + + conn.commit(); + conn.close(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java index 78fb22f..7c0c553 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java @@ -5,6 +5,7 @@ import static org.apache.phoenix.util.TestUtil.getAllSplits; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.sql.Array; @@ -152,7 +153,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT { stmt.setArray(3, array); stmt.execute(); conn.commit(); - flush(tableName); stmt = upsertStmt(conn, tableName); stmt.setString(1, "b"); s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; @@ -163,7 +163,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT { stmt.setArray(3, array); stmt.execute(); conn.commit(); - flush(tableName); stmt = upsertStmt(conn, tableName); stmt.setString(1, "c"); s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; @@ -174,7 +173,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT { stmt.setArray(3, array); stmt.execute(); conn.commit(); - flush(tableName); stmt = upsertStmt(conn, tableName); stmt.setString(1, "d"); s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; @@ -185,7 +183,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT { stmt.setArray(3, array); stmt.execute(); conn.commit(); - flush(tableName); stmt = upsertStmt(conn, tableName); stmt.setString(1, "b"); s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; @@ -196,7 +193,6 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT { stmt.setArray(3, array); stmt.execute(); conn.commit(); - flush(tableName); stmt = upsertStmt(conn, tableName); stmt.setString(1, "e"); s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" }; @@ -207,14 +203,9 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT { stmt.setArray(3, array); stmt.execute(); conn.commit(); - flush(tableName); return conn; } - private void flush(String tableName) throws IOException, InterruptedException { - //utility.getHBaseAdmin().flush(tableName.toUpperCase()); - } - private PreparedStatement upsertStmt(Connection conn, String tableName) throws SQLException { PreparedStatement stmt; stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)"); @@ -281,9 +272,12 @@ public class StatsCollectorIT extends BaseOwnClusterHBaseManagedTimeIT { nRegions = services.getAllTableRegions(STATS_TEST_TABLE_BYTES).size(); nTries++; } while (nRegions == nRegionsNow && nTries < 10); + if (nRegions == nRegionsNow) { + fail(); + } // FIXME: I see the commit of the stats finishing before this with a lower timestamp that the scan timestamp, // yet without this sleep, the query finds the old data. Seems like an HBase bug and a potentially serious one. - Thread.sleep(3000); + Thread.sleep(8000); } finally { admin.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java index 6388b1a..ba601ef 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java @@ -28,7 +28,7 @@ import java.util.List; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.execute.MutationState; -import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory; +import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.iterate.PeekingResultIterator; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index a119099..6b767e7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -46,7 +46,7 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.RowValueConstructorExpression; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory; +import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 0c1d1c8..6f247c7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -606,7 +606,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (tenantId == null) { HTableInterface statsHTable = null; try { - statsHTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME); + statsHTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES); stats = StatisticsUtil.readStatistics(statsHTable, physicalTableName.getBytes(), clientTimeStamp); timeStamp = Math.max(timeStamp, stats.getTimestamp()); } catch (org.apache.hadoop.hbase.TableNotFoundException e) { @@ -840,7 +840,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // TableName systemCatalogTableName = region.getTableDesc().getTableName(); // HTableInterface hTable = env.getTable(systemCatalogTableName); // These deprecated calls work around the issue - HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); + HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); try { boolean allViewsInCurrentRegion = true; int numOfChildViews = 0; http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index d7a90fb..2ebfa41 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -42,8 +42,8 @@ import org.apache.phoenix.iterate.LimitingResultIterator; import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator; import org.apache.phoenix.iterate.OrderedAggregatingResultIterator; import org.apache.phoenix.iterate.OrderedResultIterator; +import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.iterate.ParallelIterators; -import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory; import org.apache.phoenix.iterate.PeekingResultIterator; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.iterate.SequenceResultIterator; http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index d91ad51..33143bb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -44,7 +44,7 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.expression.ProjectedColumnExpression; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.iterate.DelegateResultIterator; -import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory; +import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.parse.FilterableStatement; http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 12dc9ff..578855d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -21,6 +21,7 @@ package org.apache.phoenix.execute; import java.sql.SQLException; import java.util.List; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; @@ -33,12 +34,15 @@ import org.apache.phoenix.iterate.ConcatResultIterator; import org.apache.phoenix.iterate.LimitingResultIterator; import org.apache.phoenix.iterate.MergeSortRowKeyResultIterator; import org.apache.phoenix.iterate.MergeSortTopNResultIterator; +import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.iterate.ParallelIterators; -import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory; import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.iterate.ResultIterators; import org.apache.phoenix.iterate.SequenceResultIterator; +import org.apache.phoenix.iterate.SerialIterators; import org.apache.phoenix.iterate.SpoolingResultIterator; import org.apache.phoenix.parse.FilterableStatement; +import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; @@ -47,6 +51,9 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.stats.GuidePostsInfo; +import org.apache.phoenix.schema.stats.StatisticsUtil; +import org.apache.phoenix.util.SchemaUtil; @@ -105,7 +112,8 @@ public class ScanPlan extends BaseQueryPlan { @Override protected ResultIterator newIterator() throws SQLException { // Set any scan attributes before creating the scanner, as it will be too late afterwards - context.getScan().setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE); + Scan scan = context.getScan(); + scan.setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE); ResultIterator scanner; TableRef tableRef = this.getTableRef(); PTable table = tableRef.getTable(); @@ -114,7 +122,40 @@ public class ScanPlan extends BaseQueryPlan { * limit is provided, run query serially. */ boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty(); - ParallelIterators iterators = new ParallelIterators(this, !allowPageFilter || isOrdered ? null : limit, parallelIteratorFactory); + boolean isSerial = false; + Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit; + /* + * If a limit is provided and we have no filter, run the scan serially when we estimate that + * the limit's worth of data will fit into a single region. + */ + if (perScanLimit != null && scan.getFilter() == null) { + GuidePostsInfo gpsInfo = table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table)); + long estRowSize = SchemaUtil.estimateRowSize(table); + long estRegionSize; + if (gpsInfo == null) { + // Use guidepost depth as minimum size + ConnectionQueryServices services = context.getConnection().getQueryServices(); + HTableDescriptor desc = services.getTableDescriptor(table.getName().getBytes()); + int guidepostPerRegion = services.getProps().getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION); + long guidepostWidth = services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES); + estRegionSize = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, desc); + } else { + // Region size estimated based on total number of bytes divided by number of regions + estRegionSize = gpsInfo.getByteCount() / (gpsInfo.getGuidePosts().size()+1); + } + // TODO: configurable number of bytes? + if (perScanLimit * estRowSize < estRegionSize) { + isSerial = true; + } + } + ResultIterators iterators; + if (isSerial) { + iterators = new SerialIterators(this, perScanLimit, parallelIteratorFactory); + } else { + iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory); + } splits = iterators.getSplits(); scans = iterators.getScans(); if (isOrdered) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java new file mode 100644 index 0000000..519c162 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY; +import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.filter.ColumnProjectionFilter; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.parse.FilterableStatement; +import org.apache.phoenix.parse.HintNode.Hint; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.schema.PColumnFamily; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.ViewType; +import org.apache.phoenix.schema.SaltingUtil; +import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.stats.GuidePostsInfo; +import org.apache.phoenix.schema.stats.PTableStats; +import org.apache.phoenix.util.LogUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SQLCloseables; +import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.ServerUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + + +/** + * + * Class that parallelizes the scan over a table using the ExecutorService provided. Each region of the table will be scanned in parallel with + * the results accessible through {@link #getIterators()} + * + * + * @since 0.1 + */ +public abstract class BaseResultIterators extends ExplainTable implements ResultIterators { + private static final Logger logger = LoggerFactory.getLogger(BaseResultIterators.class); + private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000; // 1min + private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20; + + private final List> scans; + private final List splits; + private final PTableStats tableStats; + private final byte[] physicalTableName; + private final QueryPlan plan; + + static final Function TO_KEY_RANGE = new Function() { + @Override + public KeyRange apply(HRegionLocation region) { + return KeyRange.getKeyRange(region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey()); + } + }; + + private PTable getTable() { + return plan.getTableRef().getTable(); + } + + private boolean useStats() { + Scan scan = context.getScan(); + boolean isPointLookup = context.getScanRanges().isPointLookup(); + /* + * Don't use guide posts if: + * 1) We're doing a point lookup, as HBase is fast enough at those + * to not need them to be further parallelized. TODO: perf test to verify + * 2) We're collecting stats, as in this case we need to scan entire + * regions worth of data to track where to put the guide posts. + */ + if (isPointLookup || ScanUtil.isAnalyzeTable(scan)) { + return false; + } + return true; + } + + public BaseResultIterators(QueryPlan plan, Integer perScanLimit) + throws SQLException { + super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint()); + this.plan = plan; + StatementContext context = plan.getContext(); + TableRef tableRef = plan.getTableRef(); + PTable table = tableRef.getTable(); + FilterableStatement statement = plan.getStatement(); + RowProjector projector = plan.getProjector(); + physicalTableName = table.getPhysicalName().getBytes(); + tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS; + Scan scan = context.getScan(); + if (projector.isProjectEmptyKeyValue()) { + Map> familyMap = scan.getFamilyMap(); + // If nothing projected into scan and we only have one column family, just allow everything + // to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to + // be quite a bit faster. + // Where condition columns also will get added into familyMap + // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning. + if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty() + && table.getColumnFamilies().size() == 1) { + // Project the one column family. We must project a column family since it's possible + // that there are other non declared column families that we need to ignore. + scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes()); + ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter()); + } else { + byte[] ecf = SchemaUtil.getEmptyColumnFamily(table); + // Project empty key value unless the column family containing it has + // been projected in its entirety. + if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) { + scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES); + } + } + } else if (table.getViewType() == ViewType.MAPPED) { + // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the + // selected column values are returned back to client + for (PColumnFamily family : table.getColumnFamilies()) { + scan.addFamily(family.getName().getBytes()); + } + } + + // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization. + if (perScanLimit != null) { + ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit)); + } + + doColumnProjectionOptimization(context, scan, table, statement); + + this.scans = getParallelScans(); + List splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION); + for (List scanList : scans) { + for (Scan aScan : scanList) { + splitRanges.add(KeyRange.getKeyRange(aScan.getStartRow(), aScan.getStopRow())); + } + } + this.splits = ImmutableList.copyOf(splitRanges); + } + + private void doColumnProjectionOptimization(StatementContext context, Scan scan, PTable table, FilterableStatement statement) { + Map> familyMap = scan.getFamilyMap(); + if (familyMap != null && !familyMap.isEmpty()) { + // columnsTracker contain cf -> qualifiers which should get returned. + Map> columnsTracker = + new TreeMap>(); + Set conditionOnlyCfs = new TreeSet(Bytes.BYTES_COMPARATOR); + int referencedCfCount = familyMap.size(); + for (Pair whereCol : context.getWhereCoditionColumns()) { + if (!(familyMap.containsKey(whereCol.getFirst()))) { + referencedCfCount++; + } + } + boolean useOptimization; + if (statement.getHint().hasHint(Hint.SEEK_TO_COLUMN)) { + // Do not use the optimization + useOptimization = false; + } else if (statement.getHint().hasHint(Hint.NO_SEEK_TO_COLUMN)) { + // Strictly use the optimization + useOptimization = true; + } else { + // when referencedCfCount is >1 and no Hints, we are not using the optimization + useOptimization = referencedCfCount == 1; + } + if (useOptimization) { + for (Entry> entry : familyMap.entrySet()) { + ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey()); + NavigableSet qs = entry.getValue(); + NavigableSet cols = null; + if (qs != null) { + cols = new TreeSet(); + for (byte[] q : qs) { + cols.add(new ImmutableBytesPtr(q)); + } + } + columnsTracker.put(cf, cols); + } + } + // Making sure that where condition CFs are getting scanned at HRS. + for (Pair whereCol : context.getWhereCoditionColumns()) { + if (useOptimization) { + if (!(familyMap.containsKey(whereCol.getFirst()))) { + scan.addFamily(whereCol.getFirst()); + conditionOnlyCfs.add(whereCol.getFirst()); + } + } else { + if (familyMap.containsKey(whereCol.getFirst())) { + // where column's CF is present. If there are some specific columns added against this CF, we + // need to ensure this where column also getting added in it. + // If the select was like select cf1.*, then that itself will select the whole CF. So no need to + // specifically add the where column. Adding that will remove the cf1.* stuff and only this + // where condition column will get returned! + NavigableSet cols = familyMap.get(whereCol.getFirst()); + // cols is null means the whole CF will get scanned. + if (cols != null) { + scan.addColumn(whereCol.getFirst(), whereCol.getSecond()); + } + } else { + // where column's CF itself is not present in family map. We need to add the column + scan.addColumn(whereCol.getFirst(), whereCol.getSecond()); + } + } + } + if (useOptimization && !columnsTracker.isEmpty()) { + for (ImmutableBytesPtr f : columnsTracker.keySet()) { + // This addFamily will remove explicit cols in scan familyMap and make it as entire row. + // We don't want the ExplicitColumnTracker to be used. Instead we have the ColumnProjectionFilter + scan.addFamily(f.get()); + } + // We don't need this filter for aggregates, as we're not returning back what's + // in the scan in this case. We still want the other optimization that causes + // the ExplicitColumnTracker not to be used, though. + if (!(statement.isAggregate())) { + ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table), + columnsTracker, conditionOnlyCfs)); + } + } + } + } + + @Override + public List getSplits() { + return splits; + } + + @Override + public List> getScans() { + return scans; + } + + private static List toBoundaries(List regionLocations) { + int nBoundaries = regionLocations.size() - 1; + List ranges = Lists.newArrayListWithExpectedSize(nBoundaries); + for (int i = 0; i < nBoundaries; i++) { + HRegionInfo regionInfo = regionLocations.get(i).getRegionInfo(); + ranges.add(regionInfo.getEndKey()); + } + return ranges; + } + + private static int getIndexContainingInclusive(List boundaries, byte[] inclusiveKey) { + int guideIndex = Collections.binarySearch(boundaries, inclusiveKey, Bytes.BYTES_COMPARATOR); + // If we found an exact match, return the index+1, as the inclusiveKey will be contained + // in the next region (since we're matching on the end boundary). + guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1)); + return guideIndex; + } + + private static int getIndexContainingExclusive(List boundaries, byte[] exclusiveKey) { + int guideIndex = Collections.binarySearch(boundaries, exclusiveKey, Bytes.BYTES_COMPARATOR); + // If we found an exact match, return the index we found as the exclusiveKey won't be + // contained in the next region as with getIndexContainingInclusive. + guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : guideIndex); + return guideIndex; + } + + private List getGuidePosts() { + /* + * Don't use guide posts if: + * 1) We're doing a point lookup, as HBase is fast enough at those + * to not need them to be further parallelized. TODO: pref test to verify + * 2) We're collecting stats, as in this case we need to scan entire + * regions worth of data to track where to put the guide posts. + */ + if (!useStats()) { + return Collections.emptyList(); + } + + List gps = null; + PTable table = getTable(); + Map guidePostMap = tableStats.getGuidePosts(); + byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable()); + if (table.getColumnFamilies().isEmpty()) { + // For sure we can get the defaultCF from the table + if (guidePostMap.get(defaultCF) != null) { + gps = guidePostMap.get(defaultCF).getGuidePosts(); + } + } else { + Scan scan = context.getScan(); + if (scan.getFamilyMap().size() > 0 && !scan.getFamilyMap().containsKey(defaultCF)) { + // If default CF is not used in scan, use first CF referenced in scan + GuidePostsInfo guidePostsInfo = guidePostMap.get(scan.getFamilyMap().keySet().iterator().next()); + if (guidePostsInfo != null) { + gps = guidePostsInfo.getGuidePosts(); + } + } else { + // Otherwise, favor use of default CF. + if (guidePostMap.get(defaultCF) != null) { + gps = guidePostMap.get(defaultCF).getGuidePosts(); + } + } + } + if (gps == null) { + return Collections.emptyList(); + } + return gps; + } + + private static String toString(List gps) { + StringBuilder buf = new StringBuilder(gps.size() * 100); + buf.append("["); + for (int i = 0; i < gps.size(); i++) { + buf.append(Bytes.toStringBinary(gps.get(i))); + buf.append(","); + if (i > 0 && i < gps.size()-1 && (i % 10) == 0) { + buf.append("\n"); + } + } + buf.setCharAt(buf.length()-1, ']'); + return buf.toString(); + } + + private List addNewScan(List> parallelScans, List scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary) { + PTable table = getTable(); + boolean startNewScanList = false; + if (!plan.isRowKeyOrdered()) { + startNewScanList = true; + } else if (crossedRegionBoundary) { + if (table.getIndexType() == IndexType.LOCAL) { + startNewScanList = true; + } else if (table.getBucketNum() != null) { + startNewScanList = scans.isEmpty() || + ScanUtil.crossesPrefixBoundary(startKey, + ScanUtil.getPrefix(scans.get(scans.size()-1).getStartRow(), SaltingUtil.NUM_SALTING_BYTES), + SaltingUtil.NUM_SALTING_BYTES); + } + } + if (scan != null) { + scans.add(scan); + } + if (startNewScanList && !scans.isEmpty()) { + parallelScans.add(scans); + scans = Lists.newArrayListWithExpectedSize(1); + } + return scans; + } + + private List> getParallelScans() throws SQLException { + return getParallelScans(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); + } + + /** + * Compute the list of parallel scans to run for a given query. The inner scans + * may be concatenated together directly, while the other ones may need to be + * merge sorted, depending on the query. + * @return list of parallel scans to run for a given query. + * @throws SQLException + */ + private List> getParallelScans(byte[] startKey, byte[] stopKey) throws SQLException { + Scan scan = context.getScan(); + List regionLocations = context.getConnection().getQueryServices() + .getAllTableRegions(physicalTableName); + + List regionBoundaries = toBoundaries(regionLocations); + ScanRanges scanRanges = context.getScanRanges(); + PTable table = getTable(); + boolean isSalted = table.getBucketNum() != null; + boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL; + List gps = getGuidePosts(); + if (logger.isDebugEnabled()) { + logger.debug("Guideposts: " + toString(gps)); + } + boolean traverseAllRegions = isSalted || isLocalIndex; + if (!traverseAllRegions) { + byte[] scanStartRow = scan.getStartRow(); + if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) { + startKey = scanStartRow; + } + byte[] scanStopRow = scan.getStopRow(); + if (stopKey.length == 0 || Bytes.compareTo(scanStopRow, stopKey) < 0) { + stopKey = scanStopRow; + } + } + + int regionIndex = 0; + int stopIndex = regionBoundaries.size(); + if (startKey.length > 0) { + regionIndex = getIndexContainingInclusive(regionBoundaries, startKey); + } + if (stopKey.length > 0) { + stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey)); + if (isLocalIndex) { + stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey(); + } + } + List> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1); + + byte[] currentKey = startKey; + int guideIndex = currentKey.length == 0 ? 0 : getIndexContainingInclusive(gps, currentKey); + int gpsSize = gps.size(); + int estGuidepostsPerRegion = gpsSize == 0 ? 1 : gpsSize / regionLocations.size() + 1; + int keyOffset = 0; + List scans = Lists.newArrayListWithExpectedSize(estGuidepostsPerRegion); + // Merge bisect with guideposts for all but the last region + while (regionIndex <= stopIndex) { + byte[] currentGuidePost, endKey, endRegionKey = EMPTY_BYTE_ARRAY; + if (regionIndex == stopIndex) { + endKey = stopKey; + } else { + endKey = regionBoundaries.get(regionIndex); + } + if (isLocalIndex) { + HRegionInfo regionInfo = regionLocations.get(regionIndex).getRegionInfo(); + endRegionKey = regionInfo.getEndKey(); + keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey); + } + while (guideIndex < gpsSize + && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) { + Scan newScan = scanRanges.intersectScan(scan, currentKey, currentGuidePost, keyOffset, false); + scans = addNewScan(parallelScans, scans, newScan, currentGuidePost, false); + currentKey = currentGuidePost; + guideIndex++; + } + Scan newScan = scanRanges.intersectScan(scan, currentKey, endKey, keyOffset, true); + if (isLocalIndex) { + if (newScan != null) { + newScan.setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey); + } else if (!scans.isEmpty()) { + scans.get(scans.size()-1).setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey); + } + } + scans = addNewScan(parallelScans, scans, newScan, endKey, true); + currentKey = endKey; + regionIndex++; + } + if (!scans.isEmpty()) { // Add any remaining scans + parallelScans.add(scans); + } + if (logger.isDebugEnabled()) { + logger.debug(LogUtil.addCustomAnnotations("The parallelScans: " + parallelScans, + ScanUtil.getCustomAnnotations(scan))); + } + return parallelScans; + } + + public static List reverseIfNecessary(List list, boolean reverse) { + if (!reverse) { + return list; + } + return Lists.reverse(list); + } + + /** + * Executes the scan in parallel across all regions, blocking until all scans are complete. + * @return the result iterators for the scan of each region + */ + @Override + public List getIterators() throws SQLException { + boolean success = false; + boolean isReverse = ScanUtil.isReversed(context.getScan()); + boolean isLocalIndex = getTable().getIndexType() == IndexType.LOCAL; + final ConnectionQueryServices services = context.getConnection().getQueryServices(); + ReadOnlyProps props = services.getProps(); + int numSplits = size(); + List iterators = new ArrayList(numSplits); + final List>>> futures = Lists.newArrayListWithExpectedSize(numSplits); + // TODO: what purpose does this scanID serve? + final UUID scanId = UUID.randomUUID(); + try { + submitWork(scanId, scans, futures, splits.size()); + int timeoutMs = props.getInt(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS); + boolean clearedCache = false; + for (List>> future : reverseIfNecessary(futures,isReverse)) { + List concatIterators = Lists.newArrayListWithExpectedSize(future.size()); + for (Pair> scanPair : reverseIfNecessary(future,isReverse)) { + try { + PeekingResultIterator iterator = scanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS); + concatIterators.add(iterator); + } catch (ExecutionException e) { + try { // Rethrow as SQLException + throw ServerUtil.parseServerException(e); + } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date + List>>> newFutures = Lists.newArrayListWithExpectedSize(2); + if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries + services.clearTableRegionCache(physicalTableName); + clearedCache = true; + } + // Resubmit just this portion of work again + Scan oldScan = scanPair.getFirst(); + byte[] startKey = oldScan.getStartRow(); + byte[] endKey = oldScan.getStopRow(); + if (isLocalIndex) { + endKey = oldScan.getAttribute(EXPECTED_UPPER_REGION_KEY); + } + List> newNestedScans = this.getParallelScans(startKey, endKey); + // Add any concatIterators that were successful so far + // as we need these to be in order + addIterator(iterators, concatIterators); + concatIterators = Collections.emptyList(); + submitWork(scanId, newNestedScans, newFutures, newNestedScans.size()); + for (List>> newFuture : reverseIfNecessary(newFutures, isReverse)) { + for (Pair> newScanPair : reverseIfNecessary(newFuture, isReverse)) { + // Immediate do a get (not catching exception again) and then add the iterators we + // get back immediately. They'll be sorted as expected, since they're replacing the + // original one. + PeekingResultIterator iterator = newScanPair.getSecond().get(timeoutMs, TimeUnit.MILLISECONDS); + iterators.add(iterator); + } + } + } + } + } + addIterator(iterators, concatIterators); + } + + success = true; + return iterators; + } catch (SQLException e) { + throw e; + } catch (Exception e) { + throw ServerUtil.parseServerException(e); + } finally { + if (!success) { + SQLCloseables.closeAllQuietly(iterators); + // Don't call cancel on already started work, as it causes the HConnection + // to get into a funk. Instead, just cancel queued work. + for (List>> futureScans : futures) { + for (Pair> futurePair : futureScans) { + futurePair.getSecond().cancel(false); + } + } + } + } + } + + private void addIterator(List parentIterators, List childIterators) { + if (!childIterators.isEmpty()) { + parentIterators.add(ConcatResultIterator.newIterator(childIterators)); + } + } + + protected static final class ScanLocator { + private final int outerListIndex; + private final int innerListIndex; + private final Scan scan; + + public ScanLocator(Scan scan, int outerListIndex, int innerListIndex) { + this.outerListIndex = outerListIndex; + this.innerListIndex = innerListIndex; + this.scan = scan; + } + public int getOuterListIndex() { + return outerListIndex; + } + public int getInnerListIndex() { + return innerListIndex; + } + public Scan getScan() { + return scan; + } + } + + + abstract protected String getName(); + abstract protected void submitWork(final UUID scanId, List> nestedScans, + List>>> nestedFutures, int estFlattenedSize); + + @Override + public int size() { + return this.scans.size(); + } + + @Override + public void explain(List planSteps) { + boolean displayChunkCount = context.getConnection().getQueryServices().getProps().getBoolean( + QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, + QueryServicesOptions.DEFAULT_EXPLAIN_CHUNK_COUNT); + StringBuilder buf = new StringBuilder(); + buf.append("CLIENT " + (displayChunkCount ? (this.splits.size() + "-CHUNK ") : "") + getName() + " " + size() + "-WAY "); + explain(buf.toString(),planSteps); + } + + @Override + public String toString() { + return "ResultIterators [name=" + getName() + ",scans=" + scans + "]"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java index 4a62259..fecb0d1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@ -44,7 +44,7 @@ import com.google.common.base.Preconditions; public class ChunkedResultIterator implements PeekingResultIterator { private static final Logger logger = LoggerFactory.getLogger(ChunkedResultIterator.class); - private final ParallelIterators.ParallelIteratorFactory delegateIteratorFactory; + private final ParallelIteratorFactory delegateIteratorFactory; private ImmutableBytesWritable lastKey = new ImmutableBytesWritable(); private final StatementContext context; private final TableRef tableRef; @@ -52,12 +52,12 @@ public class ChunkedResultIterator implements PeekingResultIterator { private final long chunkSize; private PeekingResultIterator resultIterator; - public static class ChunkedResultIteratorFactory implements ParallelIterators.ParallelIteratorFactory { + public static class ChunkedResultIteratorFactory implements ParallelIteratorFactory { - private final ParallelIterators.ParallelIteratorFactory delegateFactory; + private final ParallelIteratorFactory delegateFactory; private final TableRef tableRef; - public ChunkedResultIteratorFactory(ParallelIterators.ParallelIteratorFactory + public ChunkedResultIteratorFactory(ParallelIteratorFactory delegateFactory, TableRef tableRef) { this.delegateFactory = delegateFactory; this.tableRef = tableRef; @@ -74,7 +74,7 @@ public class ChunkedResultIterator implements PeekingResultIterator { } } - public ChunkedResultIterator(ParallelIterators.ParallelIteratorFactory delegateIteratorFactory, + public ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory, StatementContext context, TableRef tableRef, Scan scan, long chunkSize) throws SQLException { this.delegateIteratorFactory = delegateIteratorFactory; this.context = context; http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java index cddf3b3..03f8785 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java @@ -39,8 +39,13 @@ public class ConcatResultIterator implements PeekingResultIterator { this.resultIterators = iterators; } + private ConcatResultIterator(List iterators) { + this.resultIterators = null; + this.iterators = iterators; + } + private List getIterators() throws SQLException { - if (iterators == null) { + if (iterators == null && resultIterators != null) { iterators = resultIterators.getIterators(); } return iterators; @@ -59,7 +64,9 @@ public class ConcatResultIterator implements PeekingResultIterator { @Override public void explain(List planSteps) { - resultIterators.explain(planSteps); + if (resultIterators != null) { + resultIterators.explain(planSteps); + } } private PeekingResultIterator currentIterator() throws SQLException { @@ -88,11 +95,11 @@ public class ConcatResultIterator implements PeekingResultIterator { @Override public String toString() { - return "ConcatResultIterator [resultIterators=" + resultIterators - + ", iterators=" + iterators + ", index=" + index + "]"; + return "ConcatResultIterator [" + resultIterators == null ? ("iterators=" + iterators) : ("resultIterators=" + resultIterators) + + ", index=" + index + "]"; } - public static PeekingResultIterator newConcatResultIterator(final List concatIterators) { + public static PeekingResultIterator newIterator(final List concatIterators) { if (concatIterators.isEmpty()) { return PeekingResultIterator.EMPTY_ITERATOR; } @@ -100,24 +107,6 @@ public class ConcatResultIterator implements PeekingResultIterator { if (concatIterators.size() == 1) { return concatIterators.get(0); } - return new ConcatResultIterator(new ResultIterators() { - - @Override - public List getIterators() throws SQLException { - return concatIterators; - } - - @Override - public int size() { - return concatIterators.size(); - } - - @Override - public void explain(List planSteps) { - // TODO: review what we should for explain plan here - concatIterators.get(0).explain(planSteps); - } - - }); + return new ConcatResultIterator(concatIterators); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java index 98b95c6..63b3142 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DelegateResultIterator.java @@ -30,6 +30,10 @@ public class DelegateResultIterator implements ResultIterator { this.delegate = delegate; } + protected ResultIterator getDelegate() { + return delegate; + } + @Override public void close() throws SQLException { delegate.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingPeekingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingPeekingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingPeekingResultIterator.java new file mode 100644 index 0000000..a80693d --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingPeekingResultIterator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import java.sql.SQLException; + +import org.apache.phoenix.schema.tuple.Tuple; + +/** + * + * Iterates through tuples up to a limit + * + * + * @since 1.2 + */ +public class LimitingPeekingResultIterator extends LimitingResultIterator implements PeekingResultIterator { + + public LimitingPeekingResultIterator(PeekingResultIterator delegate, int limit) { + super(delegate, limit); + } + + + @Override + protected PeekingResultIterator getDelegate() { + return (PeekingResultIterator) super.getDelegate(); + } + + @Override + public Tuple peek() throws SQLException { + return getDelegate().peek(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c1f18f2/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java new file mode 100644 index 0000000..1ad3af0 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import java.sql.SQLException; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.StatementContext; + +public interface ParallelIteratorFactory { + PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException; +} \ No newline at end of file