phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [3/3] phoenix git commit: PHOENIX-2722 support mysql offset clause
Date Fri, 08 Apr 2016 10:44:57 GMT
PHOENIX-2722 support mysql offset clause


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/525f128b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/525f128b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/525f128b

Branch: refs/heads/4.x-HBase-1.0
Commit: 525f128bde4896b0040186e1d518ccbba9e285c7
Parents: bd39428
Author: Ankit Singhal <ankitsinghal59@gmail.com>
Authored: Fri Apr 8 16:14:31 2016 +0530
Committer: Ankit Singhal <ankitsinghal59@gmail.com>
Committed: Fri Apr 8 16:14:31 2016 +0530

----------------------------------------------------------------------
 .../apache/phoenix/end2end/AutoCommitIT.java    |  14 +-
 .../apache/phoenix/end2end/CreateTableIT.java   |   2 +-
 .../apache/phoenix/end2end/DerivedTableIT.java  | 144 ++++++++++++-
 .../org/apache/phoenix/end2end/HashJoinIT.java  | 161 ++++++++++++++
 .../phoenix/end2end/QueryWithOffsetIT.java      | 211 +++++++++++++++++++
 .../org/apache/phoenix/end2end/ReadOnlyIT.java  |  12 +-
 .../apache/phoenix/end2end/SortMergeJoinIT.java |  44 ++++
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |  16 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |  13 +-
 .../apache/phoenix/compile/JoinCompiler.java    |  10 +-
 .../phoenix/compile/ListJarsQueryPlan.java      |   5 +
 .../apache/phoenix/compile/OffsetCompiler.java  | 114 ++++++++++
 .../apache/phoenix/compile/OrderByCompiler.java |   3 +-
 .../apache/phoenix/compile/PostDDLCompiler.java |   3 +-
 .../apache/phoenix/compile/QueryCompiler.java   |  52 +++--
 .../org/apache/phoenix/compile/QueryPlan.java   |   2 +
 .../phoenix/compile/StatementNormalizer.java    |   2 +-
 .../phoenix/compile/SubqueryRewriter.java       |  10 +-
 .../phoenix/compile/SubselectRewriter.java      |  15 +-
 .../apache/phoenix/compile/TraceQueryPlan.java  |   5 +
 .../apache/phoenix/compile/UpsertCompiler.java  |   2 +-
 .../coprocessor/BaseScannerRegionObserver.java  |   1 +
 .../phoenix/coprocessor/ScanRegionObserver.java |  95 ++++++++-
 .../apache/phoenix/execute/AggregatePlan.java   |  43 ++--
 .../apache/phoenix/execute/BaseQueryPlan.java   |   9 +-
 .../phoenix/execute/ClientAggregatePlan.java    |  19 +-
 .../phoenix/execute/ClientProcessingPlan.java   |  10 +-
 .../apache/phoenix/execute/ClientScanPlan.java  |  38 ++--
 .../phoenix/execute/DegenerateQueryPlan.java    |   2 +-
 .../phoenix/execute/DelegateQueryPlan.java      |   4 +
 .../execute/LiteralResultIterationPlan.java     |  13 +-
 .../org/apache/phoenix/execute/ScanPlan.java    |  58 +++--
 .../phoenix/execute/SortMergeJoinPlan.java      |   5 +
 .../org/apache/phoenix/execute/UnionPlan.java   |  15 +-
 .../phoenix/iterate/BaseResultIterators.java    |  14 +-
 .../apache/phoenix/iterate/ExplainTable.java    |  16 +-
 .../phoenix/iterate/LimitingResultIterator.java |   2 +-
 .../iterate/MergeSortTopNResultIterator.java    |  21 +-
 .../phoenix/iterate/OffsetResultIterator.java   |  62 ++++++
 .../OrderedAggregatingResultIterator.java       |   6 +-
 .../phoenix/iterate/OrderedResultIterator.java  |  58 +++--
 .../phoenix/iterate/ParallelIterators.java      |   4 +-
 .../apache/phoenix/iterate/SerialIterators.java |  19 +-
 .../phoenix/iterate/TableResultIterator.java    |  22 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  18 +-
 .../org/apache/phoenix/join/HashJoinInfo.java   |   2 +-
 .../apache/phoenix/optimize/QueryOptimizer.java |   2 +-
 .../apache/phoenix/parse/DeleteStatement.java   |   6 +
 .../phoenix/parse/FilterableStatement.java      |   1 +
 .../org/apache/phoenix/parse/OffsetNode.java    |  67 ++++++
 .../apache/phoenix/parse/ParseNodeFactory.java  |  55 +++--
 .../apache/phoenix/parse/ParseNodeRewriter.java |   2 +-
 .../apache/phoenix/parse/SelectStatement.java   |  35 ++-
 .../apache/phoenix/query/QueryConstants.java    |   6 +
 .../java/org/apache/phoenix/util/QueryUtil.java |  30 ++-
 .../java/org/apache/phoenix/util/ScanUtil.java  |   4 +
 .../phoenix/execute/CorrelatePlanTest.java      |  39 +++-
 .../execute/LiteralResultIteratorPlanTest.java  | 192 +++++++++++++++++
 .../phoenix/execute/UnnestArrayPlanTest.java    |   3 +-
 .../query/ParallelIteratorsSplitTest.java       |   5 +
 60 files changed, 1621 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutoCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutoCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutoCommitIT.java
index aa92c5e..469f2de 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutoCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutoCommitIT.java
@@ -42,21 +42,21 @@ public class AutoCommitIT extends BaseHBaseManagedTimeIT {
         conn.setAutoCommit(true);
         
         String ddl = "CREATE TABLE test_table " +
-                "  (row varchar not null, col1 integer" +
-                "  CONSTRAINT pk PRIMARY KEY (row))\n";
+                "  (r varchar not null, col1 integer" +
+                "  CONSTRAINT pk PRIMARY KEY (r))\n";
         createTestTable(getUrl(), ddl);
         
-        String query = "UPSERT INTO test_table(row, col1) VALUES('row1', 1)";
+        String query = "UPSERT INTO test_table(r, col1) VALUES('row1', 1)";
         PreparedStatement statement = conn.prepareStatement(query);
         statement.executeUpdate();
         conn.commit();
         
         conn.setAutoCommit(false);
-        query = "UPSERT INTO test_table(row, col1) VALUES('row1', 2)";
+        query = "UPSERT INTO test_table(r, col1) VALUES('row1', 2)";
         statement = conn.prepareStatement(query);
         statement.executeUpdate();
         
-        query = "DELETE FROM test_table WHERE row='row1'";
+        query = "DELETE FROM test_table WHERE r='row1'";
         statement = conn.prepareStatement(query);
         statement.executeUpdate();
         conn.commit();
@@ -66,11 +66,11 @@ public class AutoCommitIT extends BaseHBaseManagedTimeIT {
         ResultSet rs = statement.executeQuery();
         assertFalse(rs.next());
 
-        query = "DELETE FROM test_table WHERE row='row1'";
+        query = "DELETE FROM test_table WHERE r='row1'";
         statement = conn.prepareStatement(query);
         statement.executeUpdate();
 
-        query = "UPSERT INTO test_table(row, col1) VALUES('row1', 3)";
+        query = "UPSERT INTO test_table(r, col1) VALUES('row1', 3)";
         statement = conn.prepareStatement(query);
         statement.executeUpdate();
         conn.commit();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
index 5ffc354..b0370e8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
@@ -74,7 +74,7 @@ public class CreateTableIT extends BaseClientManagedTimeIT {
                 "                data.file VARCHAR ,\n" + 
                 "                data.fk_log VARCHAR ,\n" + 
                 "                data.host VARCHAR ,\n" + 
-                "                data.row VARCHAR ,\n" + 
+                "                data.r VARCHAR ,\n" + 
                 "                data.size VARCHAR ,\n" + 
                 "                data.start_time VARCHAR ,\n" + 
                 "                data.stat_date DATE ,\n" + 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
index 6912025..6fad2cf 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
@@ -233,6 +233,46 @@ public class DerivedTableIT extends BaseClientManagedTimeIT {
             assertEquals(9,rs.getInt(1));
 
             assertFalse(rs.next());
+
+            // Inner limit < outer query offset
+            query = "SELECT t.eid, t.x + 9 FROM (SELECT entity_id eid, b_string b, a_byte + 1 x FROM aTable LIMIT 1 OFFSET 1 ) AS t WHERE t.b = '"
+                    + C_VALUE + "' OFFSET 2";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertFalse(rs.next());
+
+            // (where) offset
+            query = "SELECT t.eid, t.x + 9 FROM (SELECT entity_id eid, b_string b, a_byte + 1 x FROM aTable WHERE a_byte + 1 < 9 ) AS t OFFSET 2";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(ROW3, rs.getString(1));
+            assertEquals(13, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals(ROW4, rs.getString(1));
+            assertEquals(14, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals(ROW5, rs.getString(1));
+            assertEquals(15, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals(ROW6, rs.getString(1));
+            assertEquals(16, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals(ROW7, rs.getString(1));
+            assertEquals(17, rs.getInt(2));
+
+            // (offset) where
+            query = "SELECT t.eid, t.x + 9 FROM (SELECT entity_id eid, b_string b, a_byte + 1 x FROM aTable OFFSET 4) AS t WHERE t.b = '"
+                    + C_VALUE + "'";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(ROW5, rs.getString(1));
+            assertEquals(15, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals(ROW8, rs.getString(1));
+            assertEquals(18, rs.getInt(2));
+
         } finally {
             conn.close();
         }
@@ -350,6 +390,17 @@ public class DerivedTableIT extends BaseClientManagedTimeIT {
             assertEquals(1,rs.getInt(2));
 
             assertFalse(rs.next());
+
+            // (groupby) groupby orderby offset
+            query = "SELECT t.c, count(*) FROM (SELECT count(*) c FROM aTable GROUP BY a_string) AS t GROUP BY t.c ORDER BY count(*) DESC OFFSET 1";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertEquals(1, rs.getInt(2));
+
+            assertFalse(rs.next());
+
         } finally {
             conn.close();
         }
@@ -545,7 +596,98 @@ public class DerivedTableIT extends BaseClientManagedTimeIT {
             conn.close();
         }
     }
-    
+
+    @Test
+    public void testDerivedTableWithOffset() throws Exception {
+        long ts = nextTimestamp();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 1));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            // (LIMIT OFFSET )
+            String query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable LIMIT 2 OFFSET 1) AS t";
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(ROW2, rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals(ROW3, rs.getString(1));
+
+            assertFalse(rs.next());
+
+            // (OFFSET) limit
+            query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable OFFSET 1) AS t LIMIT 2";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(ROW2, rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals(ROW3, rs.getString(1));
+
+            assertFalse(rs.next());
+
+            // (limit OFFSET) limit OFFSET
+            query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable LIMIT 2 OFFSET 1) AS t LIMIT 4 OFFSET 1";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(ROW3, rs.getString(1));
+            assertFalse(rs.next());
+
+            // (limit OFFSET) limit 2
+            query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable LIMIT 4 OFFSET 1) AS t LIMIT 2";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(ROW2, rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals(ROW3, rs.getString(1));
+
+            assertFalse(rs.next());
+
+            // (limit ? OFFSET ?) limit ? OFFSET ?
+            query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable LIMIT ? OFFSET ?) AS t LIMIT ? OFFSET ?";
+            statement = conn.prepareStatement(query);
+            statement.setInt(1, 4);
+            statement.setInt(2, 2);
+            statement.setInt(3, 2);
+            statement.setInt(4, 2);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(ROW5, rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals(ROW6, rs.getString(1));
+            assertFalse(rs.next());
+
+            // (groupby orderby OFFSET)
+            query = "SELECT a, s FROM (SELECT a_string a, sum(a_byte) s FROM aTable GROUP BY a_string ORDER BY sum(a_byte) OFFSET 1)";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(A_VALUE, rs.getString(1));
+            assertEquals(10, rs.getInt(2));
+            assertTrue(rs.next());
+            assertEquals(B_VALUE, rs.getString(1));
+            assertEquals(26, rs.getInt(2));
+
+            assertFalse(rs.next());
+
+            // (union OFFSET) groupby 
+            query = "SELECT a_string, count(*) FROM (SELECT a_string FROM aTable where a_byte < 4 union all SELECT a_string FROM aTable where a_byte > 8 OFFSET 1) group by a_string";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(A_VALUE,rs.getString(1));
+            assertEquals(2,rs.getInt(2));
+            assertTrue (rs.next());
+            assertEquals(C_VALUE,rs.getString(1));
+            assertEquals(1,rs.getInt(2));
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+
     @Test
     public void testDerivedTableWithDistinct() throws Exception {
         long ts = nextTimestamp();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 00655cf..3e2356f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -441,6 +441,38 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
                 "    DYNAMIC SERVER FILTER BY \"I.item_id\" IN (\"O.item_id\")\n" +
                 "    JOIN-SCANNER 4 ROW LIMIT",
+                /*
+                 * testJoinWithOffset()
+                 *     SELECT order_id, i.name, s.name, s.address, quantity 
+                 *     FROM joinSupplierTable s 
+                 *     LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+                 *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2
+                 */
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER OFFSET 2\n" +
+                "    SERVER 3 ROW LIMIT\n" +
+                "CLIENT 1 ROW LIMIT\n" +
+                "    PARALLEL LEFT-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+                "    PARALLEL LEFT-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    JOIN-SCANNER 3 ROW LIMIT",
+                /*
+                 * testJoinWithOffset()
+                 *     SELECT order_id, i.name, s.name, s.address, quantity 
+                 *     FROM joinSupplierTable s 
+                 *     JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+                 *     JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2
+                 */
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER OFFSET 2\n" +
+                "CLIENT 1 ROW LIMIT\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+                "    PARALLEL INNER-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY \"S.supplier_id\" IN (\"I.supplier_id\")\n" +
+                "    JOIN-SCANNER 3 ROW LIMIT",
                 }});
         testCases.add(new String[][] {
                 {
@@ -770,6 +802,38 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "    PARALLEL INNER-JOIN TABLE 0\n" +
                 "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + ".OrderTable\n" +
                 "    JOIN-SCANNER 4 ROW LIMIT",
+                /*
+                 * testJoinWithLimit()
+                 *     SELECT order_id, i.name, s.name, s.address, quantity 
+                 *     FROM joinSupplierTable s 
+                 *     LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+                 *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2
+                 */
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER OFFSET 2\n" +
+                "    SERVER 3 ROW LIMIT\n" +
+                "CLIENT 1 ROW LIMIT\n" +
+                "    PARALLEL LEFT-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + ".idx_item\n" +
+                "    PARALLEL LEFT-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    JOIN-SCANNER 3 ROW LIMIT",
+                /*
+                 * testJoinWithLimit()
+                 *     SELECT order_id, i.name, s.name, s.address, quantity 
+                 *     FROM joinSupplierTable s 
+                 *     JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+                 *     JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2
+                 */
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER OFFSET 2\n" +
+                "CLIENT 1 ROW LIMIT\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_SCHEMA + ".idx_item\n" +
+                "    PARALLEL INNER-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY \"S.supplier_id\" IN (\"I.0:supplier_id\")\n" +
+                "    JOIN-SCANNER 3 ROW LIMIT",
                 }});
         testCases.add(new String[][] {
                 {
@@ -1124,6 +1188,40 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
                 "    DYNAMIC SERVER FILTER BY \"I.:item_id\" IN (\"O.item_id\")\n" +
                 "    JOIN-SCANNER 4 ROW LIMIT",
+                /*
+                 * testJoinWithOffset()
+                 *     SELECT order_id, i.name, s.name, s.address, quantity 
+                 *     FROM joinSupplierTable s 
+                 *     LEFT JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+                 *     LEFT JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2
+                 */
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER OFFSET 2\n" +
+                "    SERVER 3 ROW LIMIT\n" +
+                "CLIENT 1 ROW LIMIT\n" +
+                "    PARALLEL LEFT-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER "+ MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX +""+JOIN_ITEM_TABLE_DISPLAY_NAME + " [-32768]\n" +
+                "        CLIENT MERGE SORT\n" +      
+                "    PARALLEL LEFT-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    JOIN-SCANNER 3 ROW LIMIT",
+                /*
+                 * testJoinWithOffset()
+                 *     SELECT order_id, i.name, s.name, s.address, quantity 
+                 *     FROM joinSupplierTable s 
+                 *     JOIN joinItemTable i ON i.supplier_id = s.supplier_id 
+                 *     JOIN joinOrderTable o ON o.item_id = i.item_id LIMIT 1 OFFSET 2
+                 */
+                "CLIENT SERIAL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER OFFSET 2\n" +
+                "CLIENT 1 ROW LIMIT\n" +
+                "    PARALLEL INNER-JOIN TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER "+ MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX +""+JOIN_ITEM_TABLE_DISPLAY_NAME + " [-32768]\n" +
+                "        CLIENT MERGE SORT\n" +
+                "    PARALLEL INNER-JOIN TABLE 1(DELAYED EVALUATION)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    DYNAMIC SERVER FILTER BY \"S.supplier_id\" IN (\"I.0:supplier_id\")\n" +
+                "    JOIN-SCANNER 3 ROW LIMIT",
                 }});
         return testCases;
     }
@@ -2920,6 +3018,26 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
             assertEquals(rs.getString(4), "S5");
 
             assertFalse(rs.next());
+
+            query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM "
+                    + JOIN_ITEM_TABLE_FULL_NAME
+                    + " item INNER JOIN (SELECT reverse(loc_id), \"supplier_id\", name FROM "
+                    + JOIN_SUPPLIER_TABLE_FULL_NAME
+                    + " ORDER BY \"supplier_id\"  OFFSET 2) AS supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.name != 'S1')";
+            statement = conn.prepareStatement(query);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(rs.getString(1), "0000000005");
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "0000000005");
+            assertEquals(rs.getString(4), "S5");
+            assertTrue(rs.next());
+            assertEquals(rs.getString(1), "0000000006");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "0000000006");
+            assertEquals(rs.getString(4), "S6");
+            assertFalse(rs.next());
+
         } finally {
             conn.close();
         }
@@ -3234,6 +3352,49 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
     }
 
     @Test
+    public void testJoinWithOffset() throws Exception {
+        String query1 = "SELECT \"order_id\", i.name, s.name, s.address, quantity FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME
+                + " s LEFT JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON i.\"supplier_id\" = s.\"supplier_id\" LEFT JOIN "
+                + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 1 OFFSET 2 ";
+        String query2 = "SELECT \"order_id\", i.name, s.name, s.address, quantity FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME
+                + " s JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON i.\"supplier_id\" = s.\"supplier_id\" JOIN "
+                + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 1 OFFSET 2 ";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query1);
+            ResultSet rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getString(4), "202 YYY Street");
+            assertEquals(rs.getInt(5), 5000);
+
+            assertFalse(rs.next());
+
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query1);
+            assertEquals(plans[22], QueryUtil.getExplainPlan(rs));
+
+            statement = conn.prepareStatement(query2);
+            rs = statement.executeQuery();
+
+            assertTrue(rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getString(4), "202 YYY Street");
+            assertEquals(rs.getInt(5), 5000);
+            assertFalse(rs.next());
+
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query2);
+            assertEquals(plans[23], QueryUtil.getExplainPlan(rs));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
     public void testNonEquiJoin() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
new file mode 100644
index 0000000..c609581
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+@RunWith(Parameterized.class)
+public class QueryWithOffsetIT extends BaseOwnClusterHBaseManagedTimeIT {
+    
+    private String tableName;
+    private final String[] strings = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p",
+            "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
+    private final String ddl;
+
+    public QueryWithOffsetIT(String preSplit) {
+        this.tableName = tableName + "_" + preSplit.charAt(2);
+        this.ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n"
+                + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + "C2.v1 VARCHAR,\n"
+                + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) " + preSplit;
+    }
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(true));
+        // Must update config before starting server
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Parameters(name="preSplit = {0}")
+    public static Collection<String> data() {
+        return Arrays.asList(new String[] { " SPLIT ON ('e','i','o')", " SALT_BUCKETS=10" });
+    }
+
+    @Test
+    public void testLimitOffset() throws SQLException {
+        Connection conn;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        conn = DriverManager.getConnection(getUrl(), props);
+        createTestTable(getUrl(), ddl);
+        initTableValues(conn);
+        int limit = 10;
+        int offset = 10;
+        updateStatistics(conn);
+        ResultSet rs;
+        rs = conn.createStatement()
+                .executeQuery("SELECT t_id from " + tableName + " order by t_id limit " + limit + " offset " + offset);
+        int i = 0;
+        while (i++ < limit) {
+            assertTrue(rs.next());
+            assertEquals(strings[offset + i - 1], rs.getString(1));
+        }
+
+        limit = 35;
+        rs = conn.createStatement().executeQuery("SELECT t_id from " + tableName + " union all SELECT t_id from "
+                + tableName + " offset " + offset + " FETCH FIRST " + limit + " rows only");
+        i = 0;
+        while (i++ < strings.length - offset) {
+            assertTrue(rs.next());
+            assertEquals(strings[offset + i - 1], rs.getString(1));
+        }
+        i = 0;
+        while (i++ < limit - strings.length - offset) {
+            assertTrue(rs.next());
+            assertEquals(strings[i - 1], rs.getString(1));
+        }
+        conn.close();
+    }
+
+    @Test
+    public void testOffsetSerialQueryExecutedOnServer() throws SQLException {
+        Connection conn;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        conn = DriverManager.getConnection(getUrl(), props);
+        int offset = 10;
+        createTestTable(getUrl(), ddl);
+        initTableValues(conn);
+        updateStatistics(conn);
+        String query = "SELECT t_id from " + tableName + " offset " + offset;
+        ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+        rs.next();
+        rs.next();
+        rs.next();
+        assertEquals("    SERVER OFFSET " + offset, rs.getString(1));
+        rs = conn.createStatement().executeQuery(query);
+        int i = 0;
+        while (i++ < strings.length - offset) {
+            assertTrue(rs.next());
+            assertEquals(strings[offset + i - 1], rs.getString(1));
+        }
+        conn.close();
+    }
+
+    @Test
+    public void testOffsetWithoutLimit() throws SQLException {
+        Connection conn;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        conn = DriverManager.getConnection(getUrl(), props);
+        int offset = 10;
+        createTestTable(getUrl(), ddl);
+        initTableValues(conn);
+        updateStatistics(conn);
+        ResultSet rs;
+        rs = conn.createStatement()
+                .executeQuery("SELECT t_id from " + tableName + " order by t_id offset " + offset + " row");
+        int i = 0;
+        while (i++ < strings.length - offset) {
+            assertTrue(rs.next());
+            assertEquals(strings[offset + i - 1], rs.getString(1));
+        }
+
+        rs = conn.createStatement().executeQuery(
+                "SELECT k3, count(*) from " + tableName + " group by k3 order by k3 desc offset " + offset + " row");
+
+        i = 0;
+        while (i++ < strings.length - offset) {
+            assertTrue(rs.next());
+            assertEquals(strings.length - offset - i + 2, rs.getInt(1));
+        }
+
+        rs = conn.createStatement().executeQuery("SELECT t_id from " + tableName + " union all SELECT t_id from "
+                + tableName + " offset " + offset + " rows");
+        i = 0;
+        while (i++ < strings.length - offset) {
+            assertTrue(rs.next());
+            assertEquals(strings[offset + i - 1], rs.getString(1));
+        }
+        i = 0;
+        while (i++ < strings.length) {
+            assertTrue(rs.next());
+            assertEquals(strings[i - 1], rs.getString(1));
+        }
+        conn.close();
+    }
+
+    private void initTableValues(Connection conn) throws SQLException {
+        for (int i = 0; i < 26; i++) {
+            conn.createStatement().execute("UPSERT INTO " + tableName + " values('" + strings[i] + "'," + i + ","
+                    + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
+        }
+        conn.commit();
+    }
+
+    private void updateStatistics(Connection conn) throws SQLException {
+        String query = "UPDATE STATISTICS " + tableName + " SET \"" + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB
+                + "\"=" + Long.toString(500);
+        conn.createStatement().execute(query);
+    }
+
+    @Test
+    public void testMetaDataWithOffset() throws SQLException {
+        Connection conn;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        conn = DriverManager.getConnection(getUrl(), props);
+        createTestTable(getUrl(), ddl);
+        initTableValues(conn);
+        updateStatistics(conn);
+        PreparedStatement stmt = conn.prepareStatement("SELECT * from " + tableName + " offset ?");
+        ParameterMetaData pmd = stmt.getParameterMetaData();
+        assertEquals(1, pmd.getParameterCount());
+        assertEquals(Types.INTEGER, pmd.getParameterType(1));
+        stmt.setInt(1, 10);
+        ResultSet rs = stmt.executeQuery();
+        ResultSetMetaData md = rs.getMetaData();
+        assertEquals(5, md.getColumnCount());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReadOnlyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReadOnlyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReadOnlyIT.java
index fbebe08..bcc4ee8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReadOnlyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReadOnlyIT.java
@@ -40,11 +40,11 @@ public class ReadOnlyIT extends BaseHBaseManagedTimeIT {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);
         String ddl = "CREATE TABLE test_table " +
-                        "  (row varchar not null, col1 integer" +
-                        "  CONSTRAINT pk PRIMARY KEY (row))\n"; 
+                        "  (r varchar not null, col1 integer" +
+                        "  CONSTRAINT pk PRIMARY KEY (r))\n"; 
         createTestTable(getUrl(), ddl);
 
-        String query = "UPSERT INTO test_table(row, col1) VALUES('row1', 777)";
+        String query = "UPSERT INTO test_table(r, col1) VALUES('row1', 777)";
         PreparedStatement statement = conn.prepareStatement(query);
         statement.executeUpdate();
         conn.commit();
@@ -53,8 +53,8 @@ public class ReadOnlyIT extends BaseHBaseManagedTimeIT {
 		conn.setReadOnly(true);
                 assertTrue(conn.isReadOnly());
 		ddl = "CREATE TABLE test_table2 " +
-				"  (row varchar not null, col1 integer" +
-				"  CONSTRAINT pk PRIMARY KEY (row))\n";
+				"  (r varchar not null, col1 integer" +
+				"  CONSTRAINT pk PRIMARY KEY (r))\n";
 		statement = conn.prepareStatement(ddl);
         	statement.executeUpdate();
         	conn.commit();
@@ -64,7 +64,7 @@ public class ReadOnlyIT extends BaseHBaseManagedTimeIT {
         }
 	  
 	try {  
-                query = "UPSERT INTO test_table(row, col1) VALUES('row1', 888)";
+                query = "UPSERT INTO test_table(r, col1) VALUES('row1', 888)";
                 statement = conn.prepareStatement(query);
                 statement.executeUpdate();
                 conn.commit();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
index f19b886..43afd0d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
@@ -2383,6 +2383,50 @@ public class SortMergeJoinIT extends BaseHBaseManagedTimeIT {
     }
 
     @Test
+    public void testJoinWithOffset() throws Exception {
+        String query1 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, s.name, s.address, quantity FROM "
+                + JOIN_SUPPLIER_TABLE_FULL_NAME + " s LEFT JOIN " + JOIN_ITEM_TABLE_FULL_NAME
+                + " i ON i.\"supplier_id\" = s.\"supplier_id\" LEFT JOIN " + JOIN_ORDER_TABLE_FULL_NAME
+                + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 2 OFFSET 1";
+        String query2 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, s.name, s.address, quantity FROM "
+                + JOIN_SUPPLIER_TABLE_FULL_NAME + " s JOIN " + JOIN_ITEM_TABLE_FULL_NAME
+                + " i ON i.\"supplier_id\" = s.\"supplier_id\" JOIN " + JOIN_ORDER_TABLE_FULL_NAME
+                + " o ON o.\"item_id\" = i.\"item_id\" LIMIT 1 OFFSET 2";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query1);
+            ResultSet rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertNull(rs.getString(1));
+            assertNull(rs.getString(2));
+            assertEquals(rs.getString(3), "S4");
+            assertEquals(rs.getString(4), "404 YYY Street");
+            assertEquals(rs.getInt(5), 0);
+            assertTrue(rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getString(4), "101 YYY Street");
+            assertEquals(rs.getInt(5), 1000);
+            assertFalse(rs.next());
+
+            statement = conn.prepareStatement(query2);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getString(4), "202 YYY Street");
+            assertEquals(rs.getInt(5), 5000);
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
     public void testNonEquiJoin() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 64e1d32..4fc17cf 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -126,6 +126,11 @@ tokens
     LIST = 'list';
     JARS='jars';
     ROW_TIMESTAMP='row_timestamp';
+    OFFSET ='offset';
+    FETCH = 'fetch';
+    ROW = 'row';
+    ROWS = 'rows';
+    ONLY = 'only';
 }
 
 
@@ -658,7 +663,7 @@ single_select returns [SelectStatement ret]
         (WHERE where=expression)?
         (GROUP BY group=group_by)?
         (HAVING having=expression)?
-        { ParseContext context = contextStack.peek(); $ret = factory.select(from, h, d!=null, sel, where, group, having, null, null, getBindCount(), context.isAggregate(), context.hasSequences(), null, new HashMap<String,UDFParseNode>(udfParseNodes)); }
+        { ParseContext context = contextStack.peek(); $ret = factory.select(from, h, d!=null, sel, where, group, having, null, null,null, getBindCount(), context.isAggregate(), context.hasSequences(), null, new HashMap<String,UDFParseNode>(udfParseNodes)); }
     ;
 finally{ contextStack.pop(); }
 
@@ -673,7 +678,9 @@ select_node returns [SelectStatement ret]
     :   u=unioned_selects
         (ORDER BY order=order_by)?
         (LIMIT l=limit)?
-        { ParseContext context = contextStack.peek(); $ret = factory.select(u, order, l, getBindCount(), context.isAggregate()); }
+        (OFFSET o=offset (ROW | ROWS)?)?
+        (FETCH (FIRST | NEXT) (l=limit)? (ROW | ROWS) ONLY)?
+        { ParseContext context = contextStack.peek(); $ret = factory.select(u, order, l, o, getBindCount(), context.isAggregate()); }
     ;
 finally{ contextStack.pop(); }
 
@@ -705,6 +712,11 @@ limit returns [LimitNode ret]
     | l=int_or_long_literal { $ret = factory.limit(l); }
     ;
     
+offset returns [OffsetNode ret]
+	: b=bind_expression { $ret = factory.offset(b); }
+    | l=int_or_long_literal { $ret = factory.offset(l); }
+    ;
+
 sampling_rate returns [LiteralParseNode ret]
     : l=literal { $ret = l; }
     ;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 24a2add..2a97686 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -352,12 +352,10 @@ public class DeleteCompiler {
                     PColumn column = table.getPKColumns().get(i);
                     aliasedNodes.add(FACTORY.aliasedNode(null, FACTORY.column(null, '"' + column.getName().getString() + '"', null)));
                 }
-                select = FACTORY.select(
-                        delete.getTable(), 
-                        hint, false, aliasedNodes, delete.getWhere(), 
-                        Collections.<ParseNode>emptyList(), null, 
-                        delete.getOrderBy(), delete.getLimit(),
-                        delete.getBindCount(), false, false, Collections.<SelectStatement>emptyList(), delete.getUdfParseNodes());
+                select = FACTORY.select(delete.getTable(), hint, false, aliasedNodes, delete.getWhere(),
+                        Collections.<ParseNode> emptyList(), null, delete.getOrderBy(), delete.getLimit(), null,
+                        delete.getBindCount(), false, false, Collections.<SelectStatement> emptyList(),
+                        delete.getUdfParseNodes());
                 select = StatementNormalizer.normalize(select, resolverToBe);
                 SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolverToBe, connection);
                 if (transformedSelect != select) {
@@ -514,7 +512,8 @@ public class DeleteCompiler {
                     projectorToBe = new RowProjector(projectorToBe,true);
                 }
                 final RowProjector projector = projectorToBe;
-                final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, projector, null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
+                final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, projector, null, null,
+                        OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
                 mutationPlans.add(new MutationPlan() {
                     @Override
                     public ParameterMetaData getParameterMetaData() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 5d03f57..e6c5970 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -689,7 +689,9 @@ public class JoinCompiler {
             if (isSubselect())
                 return SubselectRewriter.applyOrderBy(SubselectRewriter.applyPostFilters(subselect, preFilters, tableNode.getAlias()), orderBy, tableNode.getAlias());
 
-            return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, orderBy, null, 0, false, select.hasSequence(), Collections.<SelectStatement>emptyList(), select.getUdfParseNodes());
+            return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null,
+                    null, orderBy, null, null, 0, false, select.hasSequence(),
+                    Collections.<SelectStatement> emptyList(), select.getUdfParseNodes());
         }
 
         public boolean hasFilters() {
@@ -1071,7 +1073,8 @@ public class JoinCompiler {
                 && !select.isAggregate()
                 && !select.isDistinct()
                 && !(select.getFrom() instanceof DerivedTableNode)
-                && select.getLimit() == null;
+                && select.getLimit() == null
+                && select.getOffset() == null;
     }
 
     private static ParseNode combine(List<ParseNode> nodes) {
@@ -1268,7 +1271,8 @@ public class JoinCompiler {
         String tableAlias = tableRef.getTableAlias();
         TableNode from = NODE_FACTORY.namedTable(tableAlias == null ? null : '"' + tableAlias + '"', tName, dynamicCols);
 
-        return NODE_FACTORY.select(from, hintNode, false, selectList, where, groupBy, null, orderBy, null, 0, groupBy != null, hasSequence, Collections.<SelectStatement>emptyList(), udfParseNodes);
+        return NODE_FACTORY.select(from, hintNode, false, selectList, where, groupBy, null, orderBy, null, null, 0,
+                groupBy != null, hasSequence, Collections.<SelectStatement> emptyList(), udfParseNodes);
     }
 
     public static PTable joinProjectedTables(PTable left, PTable right, JoinType type) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index f2b4856..94736ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -194,6 +194,11 @@ public class ListJarsQueryPlan implements QueryPlan {
     }
 
     @Override
+    public Integer getOffset() {
+        return null;
+    }
+
+    @Override
     public OrderBy getOrderBy() {
         return OrderBy.EMPTY_ORDER_BY;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/main/java/org/apache/phoenix/compile/OffsetCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OffsetCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OffsetCompiler.java
new file mode 100644
index 0000000..54be50b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OffsetCompiler.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.parse.BindParseNode;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.LiteralParseNode;
+import org.apache.phoenix.parse.OffsetNode;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.TraverseNoParseNodeVisitor;
+import org.apache.phoenix.schema.PDatum;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
+
+public class OffsetCompiler {
+    private static final ParseNodeFactory NODE_FACTORY = new ParseNodeFactory();
+
+    public static final PDatum OFFSET_DATUM = new PDatum() {
+        @Override
+        public boolean isNullable() {
+            return false;
+        }
+
+        @Override
+        public PDataType getDataType() {
+            return PInteger.INSTANCE;
+        }
+
+        @Override
+        public Integer getMaxLength() {
+            return null;
+        }
+
+        @Override
+        public Integer getScale() {
+            return null;
+        }
+
+        @Override
+        public SortOrder getSortOrder() {
+            return SortOrder.getDefault();
+        }
+    };
+
+    private OffsetCompiler() {}
+
+    public static Integer compile(StatementContext context, FilterableStatement statement) throws SQLException {
+        OffsetNode offsetNode = statement.getOffset();
+        if (offsetNode == null) { return null; }
+        OffsetParseNodeVisitor visitor = new OffsetParseNodeVisitor(context);
+        offsetNode.getOffsetParseNode().accept(visitor);
+        return visitor.getOffset();
+    }
+
+    private static class OffsetParseNodeVisitor extends TraverseNoParseNodeVisitor<Void> {
+        private final StatementContext context;
+        private Integer offset;
+
+        public OffsetParseNodeVisitor(StatementContext context) {
+            this.context = context;
+        }
+
+        public Integer getOffset() {
+            return offset;
+        }
+
+        @Override
+        public Void visit(LiteralParseNode node) throws SQLException {
+            Object offsetValue = node.getValue();
+            if (offsetValue != null) {
+                Integer offset = (Integer)OFFSET_DATUM.getDataType().toObject(offsetValue, node.getType());
+                if (offset.intValue() >= 0) {
+                    this.offset = offset;
+                }
+            }
+            return null;
+        }
+
+        @Override
+        public Void visit(BindParseNode node) throws SQLException {
+            // This is for static evaluation in SubselectRewriter.
+            if (context == null) return null;
+
+            Object value = context.getBindManager().getBindValue(node);
+            context.getBindManager().addParamMetaData(node, OFFSET_DATUM);
+            // Resolve the bind value, create a LiteralParseNode, and call the
+            // visit method for it.
+            // In this way, we can deal with just having a literal on one side
+            // of the expression.
+            visit(NODE_FACTORY.literal(value, OFFSET_DATUM.getDataType()));
+            return null;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
index 0ae31f0..91fa5c8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
@@ -82,7 +82,8 @@ public class OrderByCompiler {
      */
     public static OrderBy compile(StatementContext context,
                                   SelectStatement statement,
-                                  GroupBy groupBy, Integer limit, 
+                                  GroupBy groupBy, Integer limit,
+                                  Integer offset,
                                   RowProjector rowProjector,
                                   TupleProjector tupleProjector,
                                   boolean isInRowKeyOrder) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 2659b3f..752e1a5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -254,7 +254,8 @@ public class PostDDLCompiler {
                             } catch (AmbiguousColumnException e) {
                                 continue;
                             }
-                            QueryPlan plan = new AggregatePlan(context, select, tableRef, projector, null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
+                            QueryPlan plan = new AggregatePlan(context, select, tableRef, projector, null, null,
+                                    OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
                             try {
                                 ResultIterator iterator = plan.iterator();
                                 try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 113aa2b..db52f4f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -72,6 +72,7 @@ import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.collect.Lists;
@@ -168,7 +169,11 @@ public class QueryCompiler {
             SelectStatement subSelect = unionAllSelects.get(i);
             // Push down order-by and limit into sub-selects.
             if (!select.getOrderBy().isEmpty() || select.getLimit() != null) {
-                subSelect = NODE_FACTORY.select(subSelect, select.getOrderBy(), select.getLimit());
+                if (select.getOffset() == null) {
+                    subSelect = NODE_FACTORY.select(subSelect, select.getOrderBy(), select.getLimit(), null);
+                } else {
+                    subSelect = NODE_FACTORY.select(subSelect, select.getOrderBy(), null, null);
+                }
             }
             QueryPlan subPlan = compileSubquery(subSelect, true);
             TupleProjector projector = new TupleProjector(subPlan.getProjector());
@@ -182,8 +187,8 @@ public class QueryCompiler {
         StatementContext context = new StatementContext(statement, resolver, scan, sequenceManager);
 
         QueryPlan plan = compileSingleFlatQuery(context, select, statement.getParameters(), false, false, null, null, false);
-        plan =  new UnionPlan(context, select, tableRef, plan.getProjector(), plan.getLimit(), plan.getOrderBy(), GroupBy.EMPTY_GROUP_BY, 
-                plans, context.getBindManager().getParameterMetaData()); 
+        plan = new UnionPlan(context, select, tableRef, plan.getProjector(), plan.getLimit(), plan.getOffset(),
+                plan.getOrderBy(), GroupBy.EMPTY_GROUP_BY, plans, context.getBindManager().getParameterMetaData());
         return plan;
     }
 
@@ -324,10 +329,13 @@ public class QueryCompiler {
             QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
             Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
             Integer limit = null;
+            Integer offset = null;
             if (!query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
                 limit = plan.getLimit();
+                offset = plan.getOffset();
             }
-            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression, limit);
+            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes,
+                    starJoinVector, tables, fieldPositions, postJoinFilterExpression, QueryUtil.getOffsetLimit(limit, offset));
             return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, hashPlans);
         }
 
@@ -378,10 +386,14 @@ public class QueryCompiler {
             QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true);
             Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
             Integer limit = null;
+            Integer offset = null;
             if (!rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
                 limit = rhsPlan.getLimit();
+                offset = rhsPlan.getOffset();
             }
-            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Right ? JoinType.Left : type}, new boolean[] {true}, new PTable[] {lhsTable}, new int[] {fieldPosition}, postJoinFilterExpression, limit);
+            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, new List[] { joinExpressions },
+                    new JoinType[] { type == JoinType.Right ? JoinType.Left : type }, new boolean[] { true },
+                    new PTable[] { lhsTable }, new int[] { fieldPosition }, postJoinFilterExpression,  QueryUtil.getOffsetLimit(limit, offset));
             Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
             getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions);
             return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond())});
@@ -432,7 +444,11 @@ public class QueryCompiler {
         context.setResolver(resolver);
         TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()));
         ParseNode where = joinTable.getPostFiltersCombined();
-        SelectStatement select = asSubquery ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false, Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, 0, false, joinTable.getStatement().hasSequence(), Collections.<SelectStatement>emptyList(), joinTable.getStatement().getUdfParseNodes())
+        SelectStatement select = asSubquery
+                ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false,
+                        Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, null, 0, false,
+                        joinTable.getStatement().hasSequence(), Collections.<SelectStatement> emptyList(),
+                        joinTable.getStatement().getUdfParseNodes())
                 : NODE_FACTORY.select(joinTable.getStatement(), from, where);
         
         return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
@@ -529,6 +545,7 @@ public class QueryCompiler {
             viewWhere = new SQLParser(table.getViewStatement()).parseQuery().getWhere();
         }
         Integer limit = LimitCompiler.compile(context, select);
+        Integer offset = OffsetCompiler.compile(context, select);
 
         GroupBy groupBy = GroupByCompiler.compile(context, select, isInRowKeyOrder);
         // Optimize the HAVING clause by finding any group by expressions that can be moved
@@ -547,7 +564,8 @@ public class QueryCompiler {
         groupBy = groupBy.compile(context, innerPlanTupleProjector);
         context.setResolver(resolver); // recover resolver
         RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns, where);
-        OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit, projector, groupBy == GroupBy.EMPTY_GROUP_BY ? innerPlanTupleProjector : null, isInRowKeyOrder); 
+        OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit, offset, projector,
+                groupBy == GroupBy.EMPTY_GROUP_BY ? innerPlanTupleProjector : null, isInRowKeyOrder);
         // Final step is to build the query plan
         if (!asSubquery) {
             int maxRows = statement.getMaxRows();
@@ -567,11 +585,14 @@ public class QueryCompiler {
         QueryPlan plan = innerPlan;
         if (plan == null) {
             ParallelIteratorFactory parallelIteratorFactory = asSubquery ? null : this.parallelIteratorFactory;
-            plan = select.getFrom() == null ?
-                      new LiteralResultIterationPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory)
-                    : (select.isAggregate() || select.isDistinct() ?
-                              new AggregatePlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, groupBy, having)
-                            : new ScanPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, allowPageFilter));
+            plan = select.getFrom() == null
+                    ? new LiteralResultIterationPlan(context, select, tableRef, projector, limit, offset, orderBy,
+                            parallelIteratorFactory)
+                    : (select.isAggregate() || select.isDistinct()
+                            ? new AggregatePlan(context, select, tableRef, projector, limit, offset, orderBy,
+                                    parallelIteratorFactory, groupBy, having)
+                            : new ScanPlan(context, select, tableRef, projector, limit, offset, orderBy,
+                                    parallelIteratorFactory, allowPageFilter));
         }
         if (!subqueries.isEmpty()) {
             int count = subqueries.size();
@@ -588,9 +609,10 @@ public class QueryCompiler {
             if (LiteralExpression.isTrue(where)) {
                 where = null; // we do not pass "true" as filter
             }
-            plan =  select.isAggregate() || select.isDistinct() ?
-                      new ClientAggregatePlan(context, select, tableRef, projector, limit, where, orderBy, groupBy, having, plan)
-                    : new ClientScanPlan(context, select, tableRef, projector, limit, where, orderBy, plan);
+            plan = select.isAggregate() || select.isDistinct()
+                    ? new ClientAggregatePlan(context, select, tableRef, projector, limit, offset, where, orderBy,
+                            groupBy, having, plan)
+                    : new ClientScanPlan(context, select, tableRef, projector, limit, offset, where, orderBy, plan);
 
         }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index 1c0c469..4dcc134 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -60,6 +60,8 @@ public interface QueryPlan extends StatementPlan {
     
     Integer getLimit();
 
+    Integer getOffset();
+    
     OrderBy getOrderBy();
 
     GroupBy getGroupBy();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
index 9b54c86..566afc8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
@@ -100,7 +100,7 @@ public class StatementNormalizer extends ParseNodeRewriter {
             if (selectNodes != normSelectNodes) {
                 statement = NODE_FACTORY.select(statement.getFrom(), statement.getHint(), statement.isDistinct(),
                         normSelectNodes, statement.getWhere(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(),
-                        statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
+                        statement.getLimit(), statement.getOffset(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence(), statement.getSelects(), statement.getUdfParseNodes());
             }
         }
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
index 123cb6a..f051aa5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
@@ -341,7 +341,10 @@ public class SubqueryRewriter extends ParseNodeRewriter {
                 groupbyNodes.set(i - 1, aliasedNode.getNode());
             }
             SelectStatement derivedTableStmt = NODE_FACTORY.select(subquery, subquery.isDistinct(), derivedTableSelect, where, derivedTableGroupBy, true);
-            subquery = NODE_FACTORY.select(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt), subquery.getHint(), false, selectNodes, null, groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), true, false, Collections.<SelectStatement>emptyList(), subquery.getUdfParseNodes());
+            subquery = NODE_FACTORY.select(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt),
+                    subquery.getHint(), false, selectNodes, null, groupbyNodes, null,
+                    Collections.<OrderByNode> emptyList(), null, null, subquery.getBindCount(), true, false,
+                    Collections.<SelectStatement> emptyList(), subquery.getUdfParseNodes());
         }
         
         ParseNode onNode = conditionExtractor.getJoinCondition();
@@ -364,7 +367,10 @@ public class SubqueryRewriter extends ParseNodeRewriter {
             return select;
         
         // Wrap as a derived table.
-        return NODE_FACTORY.select(NODE_FACTORY.derivedTable(ParseNodeFactory.createTempAlias(), select), HintNode.EMPTY_HINT_NODE, false, select.getSelect(), null, null, null, null, null, select.getBindCount(), false, false, Collections.<SelectStatement> emptyList(), select.getUdfParseNodes());
+        return NODE_FACTORY.select(NODE_FACTORY.derivedTable(ParseNodeFactory.createTempAlias(), select),
+                HintNode.EMPTY_HINT_NODE, false, select.getSelect(), null, null, null, null, null, null,
+                select.getBindCount(), false, false, Collections.<SelectStatement> emptyList(),
+                select.getUdfParseNodes());
     }
     
     private List<AliasedNode> fixAliasedNodes(List<AliasedNode> nodes, boolean addSelectOne) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
index 853d772..1def3a7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.parse.ColumnParseNode;
 import org.apache.phoenix.parse.DerivedTableNode;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.LimitNode;
+import org.apache.phoenix.parse.OffsetNode;
 import org.apache.phoenix.parse.OrderByNode;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.ParseNodeRewriter;
@@ -110,6 +111,7 @@ public class SubselectRewriter extends ParseNodeRewriter {
         ParseNode havingRewrite = subselect.getHaving();
         List<OrderByNode> orderByRewrite = subselect.getOrderBy();
         LimitNode limitRewrite = subselect.getLimit();
+        OffsetNode offsetRewrite = subselect.getOffset();
         HintNode hintRewrite = subselect.getHint();
         boolean isDistinctRewrite = subselect.isDistinct();
         boolean isAggregateRewrite = subselect.isAggregate();
@@ -187,6 +189,13 @@ public class SubselectRewriter extends ParseNodeRewriter {
             }
         }
         
+        OffsetNode offset = select.getOffset();
+        if (offsetRewrite != null || (limitRewrite != null && offset != null)) {
+            return select;
+        } else {
+            offsetRewrite = offset;
+        }
+        
         LimitNode limit = select.getLimit();
         if (limit != null) {
             if (limitRewrite == null) {
@@ -207,8 +216,10 @@ public class SubselectRewriter extends ParseNodeRewriter {
             hintRewrite = hintRewrite == null ? hint : HintNode.combine(hint, hintRewrite);
         }
         
-        return NODE_FACTORY.select(subselect.getFrom(), hintRewrite, isDistinctRewrite, selectNodesRewrite, whereRewrite, groupByRewrite, 
-            havingRewrite, orderByRewrite, limitRewrite, select.getBindCount(), isAggregateRewrite, select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
+        return NODE_FACTORY.select(subselect.getFrom(), hintRewrite, isDistinctRewrite, selectNodesRewrite,
+                whereRewrite, groupByRewrite, havingRewrite, orderByRewrite, limitRewrite, offsetRewrite,
+                select.getBindCount(), isAggregateRewrite, select.hasSequence(), select.getSelects(),
+                select.getUdfParseNodes());
     }
     
     private SelectStatement applyPostFilters(SelectStatement statement, List<ParseNode> postFilters) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 1e8210a..54b4eb7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -206,6 +206,11 @@ public class TraceQueryPlan implements QueryPlan {
     }
 
     @Override
+    public Integer getOffset() {
+        return null;
+    }
+
+    @Override
     public OrderBy getOrderBy() {
         return OrderBy.EMPTY_ORDER_BY;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 6ec7f70..7c6347f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -655,7 +655,7 @@ public class UpsertCompiler {
                     scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions));
                     
                     // Ignore order by - it has no impact
-                    final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, aggProjector, null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
+                    final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
                     return new MutationPlan() {
                         @Override
                         public ParameterMetaData getParameterMetaData() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index b09877f..87929e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -98,6 +98,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String TX_SCN = "_TxScn";
     public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow";
     public static final String IGNORE_NEWER_MUTATIONS = "_IGNORE_NEWER_MUTATIONS";
+    public final static String SCAN_OFFSET = "_RowOffset";
     
     /**
      * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations

http://git-wip-us.apache.org/repos/asf/phoenix/blob/525f128b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 5df5755..3333d9c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -23,19 +23,20 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
-import co.cask.tephra.Transaction;
-
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
@@ -47,23 +48,27 @@ import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.expression.function.ArrayIndexFunction;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.iterate.OffsetResultIterator;
 import org.apache.phoenix.iterate.OrderedResultIterator;
 import org.apache.phoenix.iterate.RegionScannerResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
 import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
-import org.apache.phoenix.util.TransactionUtil;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import co.cask.tephra.Transaction;
+
 
 /**
  *
@@ -121,7 +126,8 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
                 orderByExpressions.add(orderByExpression);
             }
             ResultIterator inner = new RegionScannerResultIterator(s);
-            return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, estimatedRowSize);
+            return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, null,
+                    estimatedRowSize);
         } catch (IOException e) {
             throw new RuntimeException(e);
         } finally {
@@ -183,7 +189,11 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
             offset = region.getStartKey().length != 0 ? region.getStartKey().length:region.getEndKey().length;
             ScanUtil.setRowKeyOffset(scan, offset);
         }
-
+        byte[] scanOffsetBytes = scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET);
+        Integer scanOffset = null;
+        if (scanOffsetBytes != null) {
+            scanOffset = Bytes.toInt(scanOffsetBytes);
+        }
         RegionScanner innerScanner = s;
 
         Set<KeyValueColumnExpression> arrayKVRefs = Sets.newHashSet();
@@ -217,7 +227,11 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
         if (j != null) {
             innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, c.getEnvironment());
         }
-
+        if (scanOffset != null) {
+            innerScanner = getOffsetScanner(c, innerScanner,
+                    new OffsetResultIterator(new RegionScannerResultIterator(innerScanner), scanOffset),
+                    scan.getAttribute(QueryConstants.LAST_SCAN) != null);
+        }
         final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner);
         if (iterator == null) {
             return innerScanner;
@@ -226,6 +240,73 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
         return getTopNScanner(c, innerScanner, iterator, tenantId);
     }
 
+    private RegionScanner getOffsetScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s,
+            final OffsetResultIterator iterator, final boolean isLastScan) throws IOException {
+        final Tuple firstTuple;
+        final HRegion region = c.getEnvironment().getRegion();
+        region.startRegionOperation();
+        try {
+            // Once we return from the first call to next, we've run through and
+            // cached
+            // the topN rows, so we no longer need to start/stop a region
+            // operation.
+            Tuple tuple = iterator.next();
+            if (tuple == null && !isLastScan) {
+                List<KeyValue> kvList = new ArrayList<KeyValue>(1);
+                KeyValue kv = new KeyValue(QueryConstants.OFFSET_ROW_KEY_BYTES, QueryConstants.OFFSET_FAMILY,
+                        QueryConstants.OFFSET_COLUMN, Bytes.toBytes(iterator.getUnusedOffset()));
+                kvList.add(kv);
+                Result r = new Result(kvList);
+                firstTuple = new ResultTuple(r);
+            } else {
+                firstTuple = tuple;
+            }
+        } catch (Throwable t) {
+            ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+            return null;
+        } finally {
+            region.closeRegionOperation();
+        }
+        return new BaseRegionScanner(s) {
+            private Tuple tuple = firstTuple;
+
+            @Override
+            public boolean isFilterDone() {
+                return tuple == null;
+            }
+
+            @Override
+            public boolean next(List<Cell> results) throws IOException {
+                try {
+                    if (isFilterDone()) { return false; }
+                    for (int i = 0; i < tuple.size(); i++) {
+                        results.add(tuple.getValue(i));
+                    }
+                    tuple = iterator.next();
+                    return !isFilterDone();
+                } catch (Throwable t) {
+                    ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+                    return false;
+                }
+            }
+
+            @Override
+            public void close() throws IOException {
+                try {
+                    s.close();
+                } finally {
+                    try {
+                        if (iterator != null) {
+                            iterator.close();
+                        }
+                    } catch (SQLException e) {
+                        ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), e);
+                    }
+                }
+            }
+        };
+    }
+
     /**
      *  Return region scanner that does TopN.
      *  We only need to call startRegionOperation and closeRegionOperation when


Mime
View raw message