phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sama...@apache.org
Subject [36/52] [abbrv] phoenix git commit: PHOENIX-2724 Query with large number of guideposts is slower compared to no stats
Date Mon, 25 Apr 2016 18:53:20 GMT
PHOENIX-2724 Query with large number of guideposts is slower compared to no stats


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/54362430
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/54362430
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/54362430

Branch: refs/heads/encodecolumns
Commit: 54362430d71be788d515944573572624628a09b6
Parents: 559dfa7
Author: Samarth <samarth.jain@salesforce.com>
Authored: Fri Apr 22 23:13:57 2016 -0700
Committer: Samarth <samarth.jain@salesforce.com>
Committed: Fri Apr 22 23:13:57 2016 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/QueryWithOffsetIT.java      |  33 ++---
 .../phoenix/end2end/SerialIteratorsIT.java      |  90 ++++++++++++
 .../phoenix/coprocessor/ScanRegionObserver.java |   9 +-
 .../org/apache/phoenix/execute/ScanPlan.java    |  17 ++-
 .../phoenix/iterate/BaseResultIterators.java    |   4 +-
 .../phoenix/iterate/ChunkedResultIterator.java  |  13 +-
 .../phoenix/iterate/OffsetResultIterator.java   |   2 +-
 .../phoenix/iterate/ParallelIterators.java      |   4 +-
 .../apache/phoenix/iterate/SerialIterators.java | 142 ++++++++++++++-----
 .../phoenix/iterate/SpoolingResultIterator.java |  18 ++-
 .../phoenix/iterate/TableResultIterator.java    |  22 +--
 .../apache/phoenix/jdbc/PhoenixConnection.java  |   2 +-
 .../apache/phoenix/query/QueryConstants.java    |   2 +-
 .../org/apache/phoenix/schema/PTableImpl.java   |   1 +
 .../java/org/apache/phoenix/util/QueryUtil.java |  10 +-
 15 files changed, 278 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/54362430/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
index c609581..7f360de 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
@@ -89,9 +89,10 @@ public class QueryWithOffsetIT extends BaseOwnClusterHBaseManagedTimeIT
{
         rs = conn.createStatement()
                 .executeQuery("SELECT t_id from " + tableName + " order by t_id limit " +
limit + " offset " + offset);
         int i = 0;
-        while (i++ < limit) {
+        while (i < limit) {
             assertTrue(rs.next());
-            assertEquals(strings[offset + i - 1], rs.getString(1));
+            assertEquals("Expected string didn't match for i = " + i, strings[offset + i],
rs.getString(1));
+            i++;
         }
 
         limit = 35;
@@ -176,20 +177,6 @@ public class QueryWithOffsetIT extends BaseOwnClusterHBaseManagedTimeIT
{
         conn.close();
     }
 
-    private void initTableValues(Connection conn) throws SQLException {
-        for (int i = 0; i < 26; i++) {
-            conn.createStatement().execute("UPSERT INTO " + tableName + " values('" + strings[i]
+ "'," + i + ","
-                    + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
-        }
-        conn.commit();
-    }
-
-    private void updateStatistics(Connection conn) throws SQLException {
-        String query = "UPDATE STATISTICS " + tableName + " SET \"" + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB
-                + "\"=" + Long.toString(500);
-        conn.createStatement().execute(query);
-    }
-
     @Test
     public void testMetaDataWithOffset() throws SQLException {
         Connection conn;
@@ -207,5 +194,19 @@ public class QueryWithOffsetIT extends BaseOwnClusterHBaseManagedTimeIT
{
         ResultSetMetaData md = rs.getMetaData();
         assertEquals(5, md.getColumnCount());
     }
+    
+    private void initTableValues(Connection conn) throws SQLException {
+        for (int i = 0; i < 26; i++) {
+            conn.createStatement().execute("UPSERT INTO " + tableName + " values('" + strings[i]
+ "'," + i + ","
+                    + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
+        }
+        conn.commit();
+    }
+
+    private void updateStatistics(Connection conn) throws SQLException {
+        String query = "UPDATE STATISTICS " + tableName + " SET \"" + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB
+                + "\"=" + Long.toString(500);
+        conn.createStatement().execute(query);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54362430/phoenix-core/src/it/java/org/apache/phoenix/end2end/SerialIteratorsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SerialIteratorsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SerialIteratorsIT.java
new file mode 100644
index 0000000..d4c71af
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SerialIteratorsIT.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class SerialIteratorsIT extends BaseHBaseManagedTimeTableReuseIT {
+    private String tableName = generateRandomString();
+    private final String[] strings = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j",
"k", "l", "m", "n", "o", "p",
+            "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
+    private final String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n"
+ "k1 INTEGER NOT NULL,\n"
+            + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + "C2.v1 VARCHAR,\n"
+            + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) SPLIT ON ('e','i','o')";
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Don't force row key order
+        props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+    
+    @Test
+    public void testConcatenatingSerialIterators() throws Exception {
+        Connection conn;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        conn = DriverManager.getConnection(getUrl(), props);
+        createTestTable(getUrl(), ddl);
+        initTableValues(conn);
+        String query = "SELECT t_id from " + tableName + " order by t_id desc limit " + 10;
+        PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
+        ResultSet rs = stmt.executeQuery(query);
+        int i = 25;
+        while (i >= 16) {
+            assertTrue(rs.next());
+            assertEquals(strings[i--], rs.getString(1));
+        }
+        query = "SELECT t_id from " + tableName + " order by t_id limit " + 10;
+        stmt = conn.createStatement().unwrap(PhoenixStatement.class);
+        rs = stmt.executeQuery(query);
+        i = 0;
+        while (i < 10) {
+            assertTrue(rs.next());
+            assertEquals(strings[i++], rs.getString(1));
+        }
+        conn.close();
+    }
+    
+    private void initTableValues(Connection conn) throws SQLException {
+        for (int i = 0; i < 26; i++) {
+            conn.createStatement().execute("UPSERT INTO " + tableName + " values('" + strings[i]
+ "'," + i + ","
+                    + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
+        }
+        conn.commit();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54362430/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 3333d9c..72f6d09 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -60,6 +60,7 @@ import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
@@ -192,7 +193,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
         byte[] scanOffsetBytes = scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET);
         Integer scanOffset = null;
         if (scanOffsetBytes != null) {
-            scanOffset = Bytes.toInt(scanOffsetBytes);
+            scanOffset = (Integer) PInteger.INSTANCE.toObject(scanOffsetBytes);
         }
         RegionScanner innerScanner = s;
 
@@ -246,15 +247,11 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
         final HRegion region = c.getEnvironment().getRegion();
         region.startRegionOperation();
         try {
-            // Once we return from the first call to next, we've run through and
-            // cached
-            // the topN rows, so we no longer need to start/stop a region
-            // operation.
             Tuple tuple = iterator.next();
             if (tuple == null && !isLastScan) {
                 List<KeyValue> kvList = new ArrayList<KeyValue>(1);
                 KeyValue kv = new KeyValue(QueryConstants.OFFSET_ROW_KEY_BYTES, QueryConstants.OFFSET_FAMILY,
-                        QueryConstants.OFFSET_COLUMN, Bytes.toBytes(iterator.getUnusedOffset()));
+                        QueryConstants.OFFSET_COLUMN, PInteger.INSTANCE.toBytes(iterator.getRemainingOffset()));
                 kvList.add(kv);
                 Result r = new Result(kvList);
                 firstTuple = new ResultTuple(r);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54362430/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 93ae5d6..c5dabfe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -18,6 +18,9 @@
 package org.apache.phoenix.execute;
 
 
+import static org.apache.phoenix.util.ScanUtil.isPacingScannersPossible;
+import static org.apache.phoenix.util.ScanUtil.isRoundRobinPossible;
+
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
@@ -113,6 +116,15 @@ public class ScanPlan extends BaseQueryPlan {
             return false;
         }
         PTable table = tableRef.getTable();
+        /*
+         * For salted or local index tables, if rows are requested in a row key order, then
we
+         * cannot execute a query serially. We need to be able to do a merge sort across
all scans
+         * which isn't possible with SerialIterators. For other kinds of tables though we
are ok
+         * since SerialIterators execute scans in the correct order.
+         */
+        if ((table.getBucketNum() != null || table.getIndexType() == IndexType.LOCAL) &&
ScanUtil.shouldRowsBeInRowKeyOrder(orderBy, context)) {
+            return false;
+        }
         GuidePostsInfo gpsInfo = table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
         long estRowSize = SchemaUtil.estimateRowSize(table);
         long estRegionSize;
@@ -147,8 +159,9 @@ public class ScanPlan extends BaseQueryPlan {
             TableRef table, OrderBy orderBy, Integer limit,Integer offset, boolean allowPageFilter)
throws SQLException {
 
         if ((isSerial(context, statement, table, orderBy, limit, offset, allowPageFilter)
-                || ScanUtil.isRoundRobinPossible(orderBy, context) || ScanUtil.isPacingScannersPossible(context))
-                && offset == null) { return ParallelIteratorFactory.NOOP_FACTORY;
}
+                || isRoundRobinPossible(orderBy, context) || isPacingScannersPossible(context)))
{
+            return ParallelIteratorFactory.NOOP_FACTORY;
+        }
         ParallelIteratorFactory spoolingResultIteratorFactory =
                 new SpoolingResultIterator.SpoolingResultIteratorFactory(
                         context.getConnection().getQueryServices());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54362430/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 0299f18..043bd30 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -676,7 +676,7 @@ public abstract class BaseResultIterators extends ExplainTable implements
Result
         SQLException toThrow = null;
         int queryTimeOut = context.getStatement().getQueryTimeoutInMillis();
         try {
-            submitWork(scan, futures, allIterators, splitSize);
+            submitWork(scan, futures, allIterators, splitSize, isReverse);
             boolean clearedCache = false;
             for (List<Pair<Scan,Future<PeekingResultIterator>>> future
: reverseIfNecessary(futures,isReverse)) {
                 List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
@@ -868,7 +868,7 @@ public abstract class BaseResultIterators extends ExplainTable implements
Result
 
     abstract protected String getName();    
     abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>>
nestedFutures,
-            Queue<PeekingResultIterator> allIterators, int estFlattenedSize) throws
SQLException;
+            Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean
isReverse) throws SQLException;
     
     @Override
     public int size() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54362430/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index a78565d..7a830de 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -44,7 +44,13 @@ import com.google.common.base.Preconditions;
 /**
  * {@code PeekingResultIterator} implementation that loads data in chunks. This is intended
for
  * basic scan plans, to avoid loading large quantities of data from HBase in one go.
+ * 
+ * <p>
+ * Chunking is deprecated and shouldn't be used while implementing new features. As of HBase
0.98.17, 
+ * we rely on pacing the server side scanners instead of pulling rows from the server in
chunks.
+ * </p>
  */
+@Deprecated
 public class ChunkedResultIterator implements PeekingResultIterator {
     private static final Logger logger = LoggerFactory.getLogger(ChunkedResultIterator.class);
 
@@ -56,7 +62,12 @@ public class ChunkedResultIterator implements PeekingResultIterator {
     private final MutationState mutationState;
     private Scan scan;
     private PeekingResultIterator resultIterator;
-
+    
+    /**
+     * Chunking is deprecated and shouldn't be used while implementing new features. As of
HBase 0.98.17, 
+     * we rely on pacing the server side scanners instead of pulling rows from the server
in chunks.
+     */
+    @Deprecated
     public static class ChunkedResultIteratorFactory implements ParallelIteratorFactory {
 
         private final ParallelIteratorFactory delegateFactory;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54362430/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
index ef8eacf..db53806 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
@@ -56,7 +56,7 @@ public class OffsetResultIterator extends DelegateResultIterator {
         return "OffsetResultIterator [rowCount=" + rowCount + ", offset=" + offset + "]";
     }
 
-    public Integer getUnusedOffset() {
+    public Integer getRemainingOffset() {
         return (offset - rowCount) > 0 ? (offset - rowCount) : 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54362430/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index ca0eba0..a5664c7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -67,7 +67,7 @@ public class ParallelIterators extends BaseResultIterators {
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>>
nestedFutures,
-            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize)
throws SQLException {
+            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize,
boolean isReverse) throws SQLException {
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor
@@ -99,7 +99,7 @@ public class ParallelIterators extends BaseResultIterators {
             final CombinableMetric scanMetrics = readMetrics.allotMetric(MetricType.SCAN_BYTES,
physicalTableName);
             final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics,
physicalTableName);
             final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator(mutationState,
tableRef, scan, scanMetrics, renewLeaseThreshold);
-            context.getConnection().addIterator(tableResultItr);
+            context.getConnection().addIteratorForLeaseRenewal(tableResultItr);
             Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new
JobCallable<PeekingResultIterator>() {
                 
                 @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54362430/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 17c2279..d2c89b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -30,13 +30,16 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.trace.util.Tracing;
-import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.QueryUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -53,11 +56,13 @@ import com.google.common.collect.Lists;
 public class SerialIterators extends BaseResultIterators {
 	private static final String NAME = "SERIAL";
     private final ParallelIteratorFactory iteratorFactory;
+    private final Integer offset;
     
     public SerialIterators(QueryPlan plan, Integer perScanLimit, Integer offset,
             ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper)
             throws SQLException {
         super(plan, perScanLimit, offset, scanGrouper);
+        this.offset = offset;
         // must be a offset or a limit specified or a SERIAL hint
         Preconditions.checkArgument(
                 offset != null || perScanLimit != null || plan.getStatement().getHint().hasHint(HintNode.Hint.SERIAL));
@@ -65,40 +70,28 @@ public class SerialIterators extends BaseResultIterators {
     }
 
     @Override
-    protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>>
nestedFutures,
-            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize)
{
-        // Pre-populate nestedFutures lists so that we can shuffle the scans
-        // and add the future to the right nested list. By shuffling the scans
-        // we get better utilization of the cluster since our thread executor
-        // will spray the scans across machines as opposed to targeting a
-        // single one since the scans are in row key order.
+    protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>>
nestedFutures,
+            final Queue<PeekingResultIterator> allIterators, int estFlattenedSize,
boolean isReverse) {
         ExecutorService executor = context.getConnection().getQueryServices().getExecutor();
-        
-        for (final List<Scan> scans : nestedScans) {
-            Scan firstScan = scans.get(0);
-            Scan lastScan = scans.get(scans.size()-1);
-            final Scan overallScan = ScanUtil.newScan(firstScan);
-            overallScan.setStopRow(lastScan.getStopRow());
-            final String tableName = tableRef.getTable().getPhysicalName().getString();
-            final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(context.getReadMetricsQueue(),
tableName);
-            final PhoenixConnection conn = context.getConnection();
-            final long renewLeaseThreshold = conn.getQueryServices().getRenewLeaseThresholdMilliSeconds();
-            lastScan.setAttribute(QueryConstants.LAST_SCAN, Bytes.toBytes(Boolean.TRUE));
+        final String tableName = tableRef.getTable().getPhysicalName().getString();
+        final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(context.getReadMetricsQueue(),
tableName);
+        final PhoenixConnection conn = context.getConnection();
+        final long renewLeaseThreshold = conn.getQueryServices().getRenewLeaseThresholdMilliSeconds();
+        int expectedListSize = nestedScans.size() * 10;
+        List<Scan> flattenedScans = Lists.newArrayListWithExpectedSize(expectedListSize);
+        for (List<Scan> list : nestedScans) {
+            flattenedScans.addAll(list);
+        }
+        if (!flattenedScans.isEmpty()) { 
+            if (isReverse) {
+                flattenedScans = Lists.reverse(flattenedScans);
+            }
+            final List<Scan> finalScans = flattenedScans;
             Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new
JobCallable<PeekingResultIterator>() {
                 @Override
                 public PeekingResultIterator call() throws Exception {
-                    PeekingResultIterator previousIterator = null;
-                	List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(scans.size());
-                	for (final Scan scan : scans) {
-                	    TableResultIterator scanner = new TableResultIterator(mutationState,
tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold,
previousIterator);
-                	    conn.addIterator(scanner);
-                	    PeekingResultIterator iterator = iteratorFactory.newIterator(context,
scanner, scan, tableName);
-                	    concatIterators.add(iterator);
-                	    previousIterator = iterator;
-                	}
-                	PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators);
-                    allIterators.add(concatIterator);
-                    return concatIterator;
+                    PeekingResultIterator itr = new SerialIterator(finalScans, tableName,
renewLeaseThreshold, offset);
+                    return itr;
                 }
 
                 /**
@@ -117,7 +110,7 @@ public class SerialIterators extends BaseResultIterators {
                 }
             }, "Serial scanner for table: " + tableRef.getTable().getPhysicalName().getString()));
             // Add our singleton Future which will execute serially
-            nestedFutures.add(Collections.singletonList(new Pair<Scan,Future<PeekingResultIterator>>(overallScan,future)));
+            nestedFutures.add(Collections.singletonList(new Pair<Scan, Future<PeekingResultIterator>>(flattenedScans.get(0),
future)));
         }
     }
 
@@ -125,4 +118,89 @@ public class SerialIterators extends BaseResultIterators {
     protected String getName() {
         return NAME;
     }
+    
+    /**
+     * 
+     * Iterator that creates iterators for scans only when needed.
+     * This helps reduce the cost of pre-constructing all the iterators
+     * which we may not even use.
+     */
+    private class SerialIterator implements PeekingResultIterator {
+        private final List<Scan> scans;
+        private final String tableName;
+        private final long renewLeaseThreshold;
+        private int index;
+        private PeekingResultIterator currentIterator;
+        private Integer remainingOffset;
+        
+        private SerialIterator(List<Scan> flattenedScans, String tableName, long renewLeaseThreshold,
Integer offset) throws SQLException {
+            this.scans = Lists.newArrayListWithExpectedSize(flattenedScans.size());
+            this.tableName = tableName;
+            this.renewLeaseThreshold = renewLeaseThreshold;
+            this.scans.addAll(flattenedScans);
+            this.remainingOffset = offset;
+            if (this.remainingOffset != null) {
+                // mark the last scan for offset purposes
+                this.scans.get(this.scans.size() - 1).setAttribute(QueryConstants.LAST_SCAN,
Bytes.toBytes(Boolean.TRUE));
+            }
+        }
+        
+        private PeekingResultIterator currentIterator() throws SQLException {
+            if (currentIterator == null) {
+                return currentIterator = nextIterator();
+            }
+            if (currentIterator.peek() == null) {
+                currentIterator.close();
+                currentIterator = nextIterator();
+            }
+            return currentIterator;
+        }
+        
+        private PeekingResultIterator nextIterator() throws SQLException {
+            if (index >= scans.size()) {
+                return EMPTY_ITERATOR;
+            }
+            while (index < scans.size()) {
+                Scan currentScan = scans.get(index++);
+                if (remainingOffset != null) {
+                    currentScan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, PInteger.INSTANCE.toBytes(remainingOffset));
+                }
+                TableResultIterator itr = new TableResultIterator(mutationState, tableRef,
currentScan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold);
+                PeekingResultIterator peekingItr = iteratorFactory.newIterator(context, itr,
currentScan, tableName);
+                Tuple tuple;
+                if ((tuple = peekingItr.peek()) == null) {
+                    peekingItr.close();
+                    continue;
+                } else if ((remainingOffset = QueryUtil.getRemainingOffset(tuple)) != null)
{
+                    peekingItr.next();
+                    peekingItr.close();
+                    continue;
+                }
+                context.getConnection().addIteratorForLeaseRenewal(itr);
+                return peekingItr;
+            }
+            return EMPTY_ITERATOR;
+        }
+        
+        @Override
+        public Tuple next() throws SQLException {
+            return currentIterator().next();
+        }
+
+        @Override
+        public void explain(List<String> planSteps) {}
+
+        @Override
+        public void close() throws SQLException {
+            if (currentIterator != null) {
+                currentIterator.close();
+            }
+        }
+
+        @Override
+        public Tuple peek() throws SQLException {
+            return currentIterator().peek();
+        }
+        
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54362430/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index 540b410..48ebed4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -57,16 +57,28 @@ import org.apache.phoenix.util.TupleUtil;
  *
  * Result iterator that spools the results of a scan to disk once an in-memory threshold
has been reached.
  * If the in-memory threshold is not reached, the results are held in memory with no disk
writing perfomed.
- *
- *
+ * 
+ * <p>
+ * Spooling is deprecated and shouldn't be used while implementing new features. As of HBase
0.98.17, 
+ * we rely on pacing the server side scanners instead of pulling rows from the server and
 potentially 
+ * spooling to a temporary file created on clients.
+ * </p>
+ *  
  * @since 0.1
  */
+@Deprecated
 public class SpoolingResultIterator implements PeekingResultIterator {
     
     private final PeekingResultIterator spoolFrom;
     private final SpoolingMetricsHolder spoolMetrics;
     private final MemoryMetricsHolder memoryMetrics;
-
+    
+    /**
+     * Spooling is deprecated and shouldn't be used while implementing new features. As of
HBase
+     * 0.98.17, we rely on pacing the server side scanners instead of pulling rows from the
server
+     * and potentially spooling to a temporary file created on clients.
+     */
+    @Deprecated
     public static class SpoolingResultIteratorFactory implements ParallelIteratorFactory
{
         private final QueryServices services;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54362430/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index f5a9dc4..029987a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -32,14 +32,12 @@ import javax.annotation.concurrent.GuardedBy;
 import org.apache.hadoop.hbase.client.AbstractClientScanner;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.Closeables;
-import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -70,8 +68,7 @@ public class TableResultIterator implements ResultIterator {
 
     @GuardedBy("this")
     private long renewLeaseTime = 0;
-    private PeekingResultIterator previousIterator;
-
+    
     @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE!
     TableResultIterator() {
         this.scanMetrics = null;
@@ -85,19 +82,13 @@ public class TableResultIterator implements ResultIterator {
     };
 
     public TableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan,
CombinableMetric scanMetrics,
-			long renewLeaseThreshold) throws SQLException {
-    	this(mutationState,tableRef,scan,scanMetrics,renewLeaseThreshold,null);
-    }
-    
-    public TableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan,
CombinableMetric scanMetrics,
-            long renewLeaseThreshold, PeekingResultIterator previousIterator) throws SQLException
{
+            long renewLeaseThreshold) throws SQLException {
         this.scan = scan;
         this.scanMetrics = scanMetrics;
         PTable table = tableRef.getTable();
         htable = mutationState.getHTable(table);
         this.scanIterator = UNINITIALIZED_SCANNER;
         this.renewLeaseThreshold = renewLeaseThreshold;
-        this.previousIterator = previousIterator;
     }
 
     @Override
@@ -129,15 +120,6 @@ public class TableResultIterator implements ResultIterator {
         ResultIterator delegate = this.scanIterator;
         if (delegate == UNINITIALIZED_SCANNER) {
             try {
-                if (previousIterator != null && scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET)
!= null) {
-                    byte[] unusedOffset = QueryUtil.getUnusedOffset(previousIterator.peek());
-                    if (unusedOffset != null) {
-                        scan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, unusedOffset);
-                        previousIterator.next();
-                    } else {
-                        scan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, null);
-                    }
-                }
                 this.scanIterator =
                         new ScanningResultIterator(htable.getScanner(scan), scanMetrics);
             } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54362430/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 80179ae..08b300b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -1033,7 +1033,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated,
SQLClosea
         this.parallelIteratorFactory = parallelIteratorFactory;
     }
     
-    public void addIterator(@Nonnull TableResultIterator itr) {
+    public void addIteratorForLeaseRenewal(@Nonnull TableResultIterator itr) {
         if (services.supportsFeature(Feature.RENEW_LEASE)) {
             checkNotNull(itr);
             scannerQueue.add(new WeakReference<TableResultIterator>(itr));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54362430/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 450ccfe..36aa8cd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -144,7 +144,7 @@ public interface QueryConstants {
     public final static String PHOENIX_METADATA = "table";
     public final static String OFFSET_ROW_KEY = "_OFFSET_";
     public final static byte[] OFFSET_ROW_KEY_BYTES = Bytes.toBytes(OFFSET_ROW_KEY);
-    public final static ImmutableBytesPtr offsetRowKeyPtr = new ImmutableBytesPtr(OFFSET_ROW_KEY_BYTES);
+    public final static ImmutableBytesPtr OFFSET_ROW_KEY_PTR = new ImmutableBytesPtr(OFFSET_ROW_KEY_BYTES);
 
     public final static PName SINGLE_COLUMN_NAME = PNameFactory.newNormalizedName("s");
     public final static PName SINGLE_COLUMN_FAMILY_NAME = PNameFactory.newNormalizedName("s");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54362430/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 665e77f..f4300fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -1226,6 +1226,7 @@ public class PTableImpl implements PTable {
         return isTransactional;
     }
 
+    @Override
     public int getBaseColumnCount() {
         return baseColumnCount;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54362430/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 99c4596..a0c367d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -48,7 +48,9 @@ import org.apache.phoenix.parse.WildcardParseNode;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PInteger;
 
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
@@ -370,13 +372,13 @@ public final class QueryUtil {
 
     }
 
-    public static byte[] getUnusedOffset(Tuple offsetTuple) {
+    public static Integer getRemainingOffset(Tuple offsetTuple) {
         if (offsetTuple != null) {
             ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr();
             offsetTuple.getKey(rowKeyPtr);
-            if (QueryConstants.offsetRowKeyPtr.compareTo(rowKeyPtr) == 0) {
-                Cell value = offsetTuple.getValue(QueryConstants.OFFSET_FAMILY, QueryConstants.OFFSET_COLUMN);
-                return value.getValue();
+            if (QueryConstants.OFFSET_ROW_KEY_PTR.compareTo(rowKeyPtr) == 0) {
+                Cell cell = offsetTuple.getValue(QueryConstants.OFFSET_FAMILY, QueryConstants.OFFSET_COLUMN);
+                return PInteger.INSTANCE.toObject(cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength(), PInteger.INSTANCE, SortOrder.ASC, null, null);
             }
         }
         return null;


Mime
View raw message