phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [2/2] phoenix git commit: PHOENIX-3572 Support FETCH NEXT| n ROWS from Cursor (Biju Nair)
Date Wed, 17 May 2017 08:41:52 GMT
PHOENIX-3572 Support FETCH NEXT| n ROWS from Cursor (Biju Nair)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/42cc41c9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/42cc41c9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/42cc41c9

Branch: refs/heads/4.x-HBase-0.98
Commit: 42cc41c946c16ecb0738d7916fe8454850fd5049
Parents: 5868d09
Author: Ankit Singhal <ankitsinghal59@gmail.com>
Authored: Wed May 17 14:11:29 2017 +0530
Committer: Ankit Singhal <ankitsinghal59@gmail.com>
Committed: Wed May 17 14:11:29 2017 +0530

----------------------------------------------------------------------
 .../CursorWithRowValueConstructorIT.java        | 687 +++++++++++++++++++
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |  31 +
 .../phoenix/compile/CloseStatementCompiler.java |  57 ++
 .../phoenix/compile/DeclareCursorCompiler.java  |  75 ++
 .../phoenix/compile/OpenStatementCompiler.java  |  57 ++
 .../apache/phoenix/execute/CursorFetchPlan.java |  53 ++
 .../phoenix/iterate/CursorResultIterator.java   |  75 ++
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  91 ++-
 .../apache/phoenix/parse/CloseStatement.java    |  40 ++
 .../org/apache/phoenix/parse/CursorName.java    |  26 +
 .../phoenix/parse/DeclareCursorStatement.java   |  60 ++
 .../apache/phoenix/parse/FetchStatement.java    |  52 ++
 .../org/apache/phoenix/parse/OpenStatement.java |  40 ++
 .../apache/phoenix/parse/ParseNodeFactory.java  |  20 +
 .../org/apache/phoenix/parse/SQLParser.java     |  76 ++
 .../apache/phoenix/schema/MetaDataClient.java   |  20 +
 .../org/apache/phoenix/util/CursorUtil.java     | 189 +++++
 .../java/org/apache/phoenix/util/ScanUtil.java  |   6 +-
 .../phoenix/compile/CursorCompilerTest.java     |  87 +++
 .../apache/phoenix/parse/CursorParserTest.java  | 367 ++++++++++
 20 files changed, 2106 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/42cc41c9/phoenix-core/src/it/java/org/apache/phoenix/end2end/CursorWithRowValueConstructorIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CursorWithRowValueConstructorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CursorWithRowValueConstructorIT.java
new file mode 100644
index 0000000..dda4bd1
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CursorWithRowValueConstructorIT.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.util.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.*;
+import java.util.Properties;
+import java.util.Random;
+
+import static org.apache.phoenix.util.TestUtil.*;
+import static org.junit.Assert.*;
+
+
+public class CursorWithRowValueConstructorIT extends ParallelStatsDisabledIT {
+    private static final String TABLE_NAME = "CursorRVCTestTable";
+    protected static final Log LOG = LogFactory.getLog(CursorWithRowValueConstructorIT.class);
+
+    public void createAndInitializeTestTable() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        PreparedStatement stmt = conn.prepareStatement("CREATE TABLE IF NOT EXISTS " + TABLE_NAME +
+                "(a_id INTEGER NOT NULL, " +
+                "a_data INTEGER, " +
+                "CONSTRAINT my_pk PRIMARY KEY (a_id))");
+        stmt.execute();
+        synchronized (conn){
+            conn.commit();
+        }
+
+        //Upsert test values into the test table
+        Random rand = new Random();
+        stmt = conn.prepareStatement("UPSERT INTO " + TABLE_NAME +
+                "(a_id, a_data) VALUES (?,?)");
+        int rowCount = 0;
+        while(rowCount < 100){
+            stmt.setInt(1, rowCount);
+            stmt.setInt(2, rand.nextInt(501));
+            stmt.execute();
+            ++rowCount;
+        }
+        synchronized (conn){
+            conn.commit();
+        }
+    }
+
+    public void deleteTestTable() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+        PreparedStatement stmt = conn.prepareStatement("DROP TABLE IF EXISTS " + TABLE_NAME);
+        stmt.execute();
+        synchronized (conn){
+            conn.commit();
+        }
+    }
+
+    @Test
+    public void testCursorsOnTestTablePK() throws SQLException {
+        try{
+            createAndInitializeTestTable();
+            String querySQL = "SELECT a_id FROM " + TABLE_NAME;
+
+            //Test actual cursor implementation
+            String cursorSQL = "DECLARE testCursor CURSOR FOR " + querySQL;
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "OPEN testCursor";
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "FETCH NEXT FROM testCursor";
+            ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+            int rowID = 0;
+            while(rs.next()){
+                assertEquals(rowID,rs.getInt(1));
+                ++rowID;
+                rs = DriverManager.getConnection(getUrl()).createStatement().executeQuery(cursorSQL);
+            }
+        } finally{
+            DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute();
+            deleteTestTable();
+        }
+
+    }
+
+    @Test
+    public void testCursorsOnRandomTableData() throws SQLException {
+        try{
+            createAndInitializeTestTable();
+            String querySQL = "SELECT a_id,a_data FROM " + TABLE_NAME + " ORDER BY a_data";
+            String cursorSQL = "DECLARE testCursor CURSOR FOR " + querySQL;
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "OPEN testCursor";
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "FETCH NEXT FROM testCursor";
+            ResultSet cursorRS = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+            ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(querySQL).executeQuery();
+            int rowCount = 0;
+            while(rs.next() && cursorRS.next()){
+                assertEquals(rs.getInt(2),cursorRS.getInt(2));
+                ++rowCount;
+                cursorRS = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+            }
+            assertEquals(100, rowCount);
+        } finally{
+            DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute();
+            deleteTestTable();
+        }
+    }
+
+    @Test
+    public void testCursorsOnTestTablePKDesc() throws SQLException {
+        try{
+            createAndInitializeTestTable();
+            String dummySQL = "SELECT a_id FROM " + TABLE_NAME + " ORDER BY a_id DESC";
+
+            String cursorSQL = "DECLARE testCursor CURSOR FOR " + dummySQL;
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "OPEN testCursor";
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "FETCH NEXT FROM testCursor";
+            ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+            int rowCount = 0;
+            while(rs.next()){
+                assertEquals(99-rowCount, rs.getInt(1));
+                rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+                ++rowCount;
+            }
+            assertEquals(100, rowCount);
+        } finally{
+            DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute();
+            deleteTestTable();
+        }
+    }
+
+    @Test
+    public void testCursorsOnTestTableNonPKDesc() throws SQLException {
+        try{
+            createAndInitializeTestTable();
+            String dummySQL = "SELECT a_data FROM " + TABLE_NAME + " ORDER BY a_data DESC";
+
+            String cursorSQL = "DECLARE testCursor CURSOR FOR " + dummySQL;
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "OPEN testCursor";
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "FETCH NEXT FROM testCursor";
+            ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+            int rowCount = 0;
+            while(rs.next()){
+                rs = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+                ++rowCount;
+            }
+            assertEquals(100, rowCount);
+        } finally{
+            DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute();
+            deleteTestTable();
+        }
+    }
+
+    @Test
+    public void testCursorsOnWildcardSelect() throws SQLException {
+        try{
+            createAndInitializeTestTable();
+            String querySQL = "SELECT * FROM " + TABLE_NAME;
+            ResultSet rs = DriverManager.getConnection(getUrl()).prepareStatement(querySQL).executeQuery();
+
+            String cursorSQL = "DECLARE testCursor CURSOR FOR "+querySQL;
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "OPEN testCursor";
+            DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).execute();
+            cursorSQL = "FETCH NEXT FROM testCursor";
+            ResultSet cursorRS = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+            int rowCount = 0;
+            while(rs.next() && cursorRS.next()){
+                assertEquals(rs.getInt(1),cursorRS.getInt(1));
+                ++rowCount;
+                cursorRS = DriverManager.getConnection(getUrl()).prepareStatement(cursorSQL).executeQuery();
+            }
+            assertEquals(100, rowCount);
+        } finally{
+            DriverManager.getConnection(getUrl()).prepareStatement("CLOSE testCursor").execute();
+            deleteTestTable();
+        }
+    }
+
+    @Test
+    public void testCursorsWithBindings() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String query = "SELECT a_integer, x_integer FROM aTable WHERE ?=organization_id AND (a_integer, x_integer) = (7, 5)";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String cursor = "DECLARE testCursor CURSOR FOR "+query;
+        try {
+            PreparedStatement statement = conn.prepareStatement(cursor);
+            statement.setString(1, tenantId);
+            statement.execute();
+        }catch(SQLException e){
+            assertTrue(e.getMessage().equalsIgnoreCase("Cannot declare cursor, internal SELECT statement contains bindings!"));
+            assertTrue(!CursorUtil.cursorDeclared("testCursor"));
+            return;
+        } finally {
+            cursor = "CLOSE testCursor";
+            conn.prepareStatement(cursor).execute();
+            conn.close();
+        }
+        fail();
+    }
+
+    @Test
+    public void testCursorsInWhereWithEqualsExpression() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id AND (a_integer, x_integer) = (7, 5)";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String cursor = "DECLARE testCursor CURSOR FOR "+query;
+        try {
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                assertTrue(rs.getInt(1) == 7);
+                assertTrue(rs.getInt(2) == 5);
+                count++;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            assertTrue(count == 1);
+        } finally {
+            cursor = "CLOSE testCursor";
+            conn.prepareStatement(cursor).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCursorsInWhereWithGreaterThanExpression() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id  AND (a_integer, x_integer) >= (4, 4)";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String cursor = "DECLARE testCursor CURSOR FOR "+query;
+        try {
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                assertTrue(rs.getInt(1) >= 4);
+                assertTrue(rs.getInt(1) == 4 ? rs.getInt(2) >= 4 : rs.getInt(2) >= 0);
+                count++;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            assertTrue(count == 5);
+        } finally {
+            cursor = "CLOSE testCursor";
+            conn.prepareStatement(cursor).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCursorsInWhereWithUnEqualNumberArgs() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id  AND (a_integer, x_integer, y_integer) >= (7, 5)";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String cursor = "DECLARE testCursor CURSOR FOR "+query;
+        try {
+            double startTime = System.nanoTime();
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                assertTrue(rs.getInt(1) >= 7);
+                assertTrue(rs.getInt(1) == 7 ? rs.getInt(2) >= 5 : rs.getInt(2) >= 0);
+                count++;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            // we have key values (7,5) (8,4) and (9,3) present in aTable. So the query should return the 3 records.
+            assertTrue(count == 3);
+            double endTime = System.nanoTime();
+            System.out.println("Method Time in milliseconds: "+Double.toString((endTime-startTime)/1000000));
+        } finally {
+            cursor = "CLOSE testCursor";
+            conn.prepareStatement(cursor).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCursorsOnLHSAndLiteralExpressionOnRHS() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id  AND (a_integer, x_integer) >= 7";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String cursor = "DECLARE testCursor CURSOR FOR "+query;
+        try {
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                count++;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            // we have key values (7,5) (8,4) and (9,3) present in aTable. So the query should return the 3 records.
+            assertTrue(count == 3);
+        } finally {
+            cursor = "CLOSE testCursor";
+            conn.prepareStatement(cursor).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCursorsOnRHSLiteralExpressionOnLHS() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id  AND 7 <= (a_integer, x_integer)";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String cursor = "DECLARE testCursor CURSOR FOR "+query;
+        try {
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                count++;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            // we have key values (7,5) (8,4) and (9,3) present in aTable. So the query should return the 3 records.
+            assertTrue(count == 3);
+        } finally {
+            cursor = "CLOSE testCursor";
+            conn.prepareStatement(cursor).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCursorsOnBuiltInFunctionOperatingOnIntegerLiteral() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String query = "SELECT a_integer, x_integer FROM aTable WHERE '"+tenantId+"'=organization_id  AND (a_integer, x_integer) >= to_number('7')";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String cursor = "DECLARE testCursor CURSOR FOR "+query;
+        try {
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                count++;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            // we have key values (7,5) (8,4) and (9,3) present in aTable. So the query should return the 3 records.
+            assertEquals(3, count);
+        } finally {
+            cursor = "CLOSE testCursor";
+            conn.prepareStatement(cursor).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    /**
+     * Test for the precision of Date datatype when used as part of a filter within the internal Select statement.
+     */
+    public void testCursorsWithDateDatatypeFilter() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        long currentTime = System.currentTimeMillis();
+        java.sql.Date date = new java.sql.Date(currentTime);
+        String strCurrentDate = date.toString();
+
+        //Sets date to <yesterday's date> 23:59:59.999
+        while(date.toString().equals(strCurrentDate)){
+            currentTime -= 1;
+            date = new Date(currentTime);
+        }
+        //Sets date to <today's date> 00:00:00.001
+        date = new Date(currentTime+2);
+        java.sql.Date midnight = new Date(currentTime+1);
+
+
+        initEntityHistoryTableValues(tenantId, getDefaultSplits(tenantId), date, ts);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+
+
+        String query = "select parent_id from " + ENTITY_HISTORY_TABLE_NAME +
+                " WHERE (organization_id, parent_id, created_date, entity_history_id) IN ((?,?,?,?),(?,?,?,?))";
+
+        query = query.replaceFirst("\\?", "'"+tenantId+"'");
+        query = query.replaceFirst("\\?", "'"+PARENTID3+"'");
+        query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_DATE_FORMAT).format(date)+"')");
+        query = query.replaceFirst("\\?", "'"+ENTITYHISTID3+"'");
+        query = query.replaceFirst("\\?", "'"+tenantId+"'");
+        query = query.replaceFirst("\\?", "'"+PARENTID7+"'");
+        query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_DATE_FORMAT).format(date)+"')");
+        query = query.replaceFirst("\\?", "'"+ENTITYHISTID7+"'");
+        String cursor = "DECLARE testCursor CURSOR FOR "+query;
+
+        conn.prepareStatement(cursor).execute();
+        cursor = "OPEN testCursor";
+        conn.prepareStatement(cursor).execute();
+        cursor = "FETCH NEXT FROM testCursor";
+
+        ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+        assertTrue(rs.next());
+        assertEquals(PARENTID3, rs.getString(1));
+        rs = conn.prepareStatement(cursor).executeQuery();
+        assertTrue(rs.next());
+        assertEquals(PARENTID7, rs.getString(1));
+        assertFalse(rs.next());
+
+        //Test against the same table for the same records, but this time use the 'midnight' java.sql.Date instance.
+        //'midnight' is identical to 'date' to the tens of millisecond precision.
+        query = "select parent_id from " + ENTITY_HISTORY_TABLE_NAME +
+                " WHERE (organization_id, parent_id, created_date, entity_history_id) IN ((?,?,?,?),(?,?,?,?))";
+        query = query.replaceFirst("\\?", "'"+tenantId+"'");
+        query = query.replaceFirst("\\?", "'"+PARENTID3+"'");
+        query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_DATE_FORMAT).format(midnight)+"')");
+        query = query.replaceFirst("\\?", "'"+ENTITYHISTID3+"'");
+        query = query.replaceFirst("\\?", "'"+tenantId+"'");
+        query = query.replaceFirst("\\?", "'"+PARENTID7+"'");
+        query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_DATE_FORMAT).format(midnight)+"')");
+        query = query.replaceFirst("\\?", "'"+ENTITYHISTID7+"'");
+        cursor = "DECLARE testCursor2 CURSOR FOR "+query;
+
+        conn.prepareStatement(cursor).execute();
+        cursor = "OPEN testCursor2";
+        conn.prepareStatement(cursor).execute();
+        cursor = "FETCH NEXT FROM testCursor2";
+
+        rs = conn.prepareStatement(cursor).executeQuery();
+        assertTrue(!rs.next());
+        String sql = "CLOSE testCursor";
+        conn.prepareStatement(sql).execute();
+        sql = "CLOSE testCursor2";
+        conn.prepareStatement(sql).execute();
+    }
+
+    @Test
+    public void testCursorsWithNonLeadingPkColsOfTypesTimeStampAndVarchar() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String updateStmt =
+                "upsert into " +
+                        "ATABLE(" +
+                        "    ORGANIZATION_ID, " +
+                        "    ENTITY_ID, " +
+                        "    A_TIMESTAMP) " +
+                        "VALUES (?, ?, ?)";
+        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 1);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection upsertConn = DriverManager.getConnection(url, props);
+        upsertConn.setAutoCommit(true);
+        PreparedStatement stmt = upsertConn.prepareStatement(updateStmt);
+        stmt.setString(1, tenantId);
+        stmt.setString(2, ROW4);
+        Timestamp tsValue = new Timestamp(System.nanoTime());
+        stmt.setTimestamp(3, tsValue);
+        stmt.execute();
+
+        String query = "SELECT a_timestamp, a_string FROM aTable WHERE ?=organization_id  AND (a_timestamp, a_string) = (?, 'a')";
+        query = query.replaceFirst("\\?", "'"+tenantId+"'");
+        query = query.replaceFirst("\\?", "TO_DATE('"+DateUtil.getDateFormatter(DateUtil.DEFAULT_TIMESTAMP_FORMAT).format(tsValue)+"')");
+
+        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            String cursor = "DECLARE testCursor CURSOR FOR "+query;
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                assertTrue(rs.getTimestamp(1).equals(tsValue));
+                assertTrue(rs.getString(2).compareTo("a") == 0);
+                count++;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            assertTrue(count == 1);
+        } finally {
+            String sql = "CLOSE testCursor";
+            conn.prepareStatement(sql).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCursorsQueryMoreWithInListClausePossibleNullValues() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String updateStmt =
+                "upsert into " +
+                        "ATABLE(ORGANIZATION_ID, ENTITY_ID, Y_INTEGER, X_INTEGER) VALUES (?, ?, ?, ?)";
+        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 1);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection upsertConn = DriverManager.getConnection(url, props);
+        upsertConn.setAutoCommit(true);
+        PreparedStatement stmt = upsertConn.prepareStatement(updateStmt);
+        stmt.setString(1, tenantId);
+        stmt.setString(2, ROW4);
+        stmt.setInt(3, 4);
+        stmt.setInt(4, 5);
+        stmt.execute();
+
+        //we have a row present in aTable where x_integer = 5 and y_integer = NULL which gets translated to 0 when retriving from HBase.
+        String query = "SELECT x_integer, y_integer FROM aTable WHERE ? = organization_id AND (x_integer) IN ((5))";
+
+        query = query.replaceFirst("\\?", "'"+tenantId+"'");
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+
+        try {
+            String cursor = "DECLARE testCursor CURSOR FOR "+query;
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            assertTrue(rs.next());
+            assertEquals(5, rs.getInt(1));
+            assertEquals(4, rs.getInt(2));
+            rs = conn.prepareStatement(cursor).executeQuery();
+            assertTrue(rs.next());
+            assertEquals(5, rs.getInt(1));
+            assertEquals(0, rs.getInt(2));
+        } finally {
+            String sql = "CLOSE testCursor";
+            conn.prepareStatement(sql).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCursorsWithColsOfTypesDecimal() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+        String query = "SELECT x_decimal FROM aTable WHERE ?=organization_id AND entity_id IN (?,?,?)";
+        query = query.replaceFirst("\\?", "'"+tenantId+"'");
+        query = query.replaceFirst("\\?", "'"+ROW7+"'");
+        query = query.replaceFirst("\\?", "'"+ROW8+"'");
+        query = query.replaceFirst("\\?", "'"+ROW9+"'");
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            String cursor = "DECLARE testCursor CURSOR FOR "+query;
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                assertTrue(BigDecimal.valueOf(0.1).equals(rs.getBigDecimal(1)) || BigDecimal.valueOf(3.9).equals(rs.getBigDecimal(1)) || BigDecimal.valueOf(3.3).equals(rs.getBigDecimal(1)));
+                count++;
+                if(count == 3) break;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            assertTrue(count == 3);
+        } finally {
+            String sql = "CLOSE testCursor";
+            conn.prepareStatement(sql).execute();
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCursorsWithColsOfTypesTinyintSmallintFloatDouble() throws Exception {
+        long ts = nextTimestamp();
+        String tenantId = getOrganizationId();
+        initATableValues(ATABLE_NAME, tenantId, getDefaultSplits(tenantId), null, ts-1, getUrl(), null);
+        ensureTableCreated(getUrl(), CUSTOM_ENTITY_DATA_FULL_NAME, CUSTOM_ENTITY_DATA_FULL_NAME, ts-1);
+        String query = "SELECT a_byte,a_short,a_float,a_double FROM aTable WHERE ?=organization_id AND entity_id IN (?,?,?)";
+        query = query.replaceFirst("\\?", "'"+tenantId+"'");
+        query = query.replaceFirst("\\?", "'"+ROW1+"'");
+        query = query.replaceFirst("\\?", "'"+ROW2+"'");
+        query = query.replaceFirst("\\?", "'"+ROW3+"'");
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            String cursor = "DECLARE testCursor CURSOR FOR "+query;
+            conn.prepareStatement(cursor).execute();
+            cursor = "OPEN testCursor";
+            conn.prepareStatement(cursor).execute();
+            cursor = "FETCH NEXT FROM testCursor";
+
+            ResultSet rs = conn.prepareStatement(cursor).executeQuery();
+            int count = 0;
+            while(rs.next()) {
+                assertTrue((byte)1 == (rs.getByte(1)) || (byte)2 == (rs.getByte(1)) || (byte)3 == (rs.getByte(1)));
+                assertTrue((short)128 == (rs.getShort(2)) || (short)129 == (rs.getShort(2)) || (short)130 == (rs.getShort(2)));
+                assertTrue(0.01f == (rs.getFloat(3)) || 0.02f == (rs.getFloat(3)) || 0.03f == (rs.getFloat(3)));
+                assertTrue(0.0001 == (rs.getDouble(4)) || 0.0002 == (rs.getDouble(4)) || 0.0003 == (rs.getDouble(4)));
+                count++;
+                if(count == 3) break;
+                rs = conn.prepareStatement(cursor).executeQuery();
+            }
+            assertTrue(count == 3);
+        } finally {
+            String sql = "CLOSE testCursor";
+            conn.prepareStatement(sql).execute();
+            conn.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42cc41c9/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 6d991e1..51ba856 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -129,6 +129,10 @@ tokens
     USE='use';
     OFFSET ='offset';
     FETCH = 'fetch';
+    DECLARE = 'declare';
+    CURSOR = 'cursor';
+    OPEN = 'open';
+    CLOSE = 'close';
     ROW = 'row';
     ROWS = 'rows';
     ONLY = 'only';
@@ -408,6 +412,10 @@ oneStatement returns [BindableStatement ret]
     |   s=create_schema_node
     |   s=create_view_node
     |   s=create_index_node
+    |   s=cursor_open_node
+    |   s=cursor_close_node
+    |   s=cursor_fetch_node
+    |   s=declare_cursor_node
     |   s=drop_table_node
     |   s=drop_index_node
     |   s=alter_index_node
@@ -736,6 +744,25 @@ upsert_column_refs returns [Pair<List<ColumnDef>,List<ColumnName>> ret]
        (COMMA d=dyn_column_name_or_def { if (d.getDataType()!=null) { $ret.getFirst().add(d); } $ret.getSecond().add(d.getColumnDefName()); } )*
 ;
 	
+
+// Parse a full declare cursor expression structure.
+declare_cursor_node returns [DeclareCursorStatement ret]
+    :    DECLARE c=cursor_name CURSOR FOR s=select_node
+        {ret = factory.declareCursor(c, s); }
+    ;
+
+cursor_open_node returns [OpenStatement ret]
+    :    OPEN c=cursor_name {ret = factory.open(c);}
+    ;
+ 
+cursor_close_node returns [CloseStatement ret]
+    :    CLOSE c=cursor_name {ret = factory.close(c);}
+    ;
+
+cursor_fetch_node returns [FetchStatement ret]
+    :    FETCH NEXT (a=NUMBER)? (ROW|ROWS)? FROM c=cursor_name {ret = factory.fetch(c,true, a == null ? 1 :  Integer.parseInt( a.getText() )); }
+    ;
+
 // Parse a full delete expression structure.
 delete_node returns [DeleteStatement ret]
     :   DELETE (hint=hintClause)? FROM t=from_table_name
@@ -1025,6 +1052,10 @@ index_name returns [NamedNode ret]
     :   name=identifier {$ret = factory.indexName(name); }
     ;
 
+cursor_name returns [CursorName ret]
+    :   name=identifier {$ret = factory.cursorName(name);}
+    ;
+
 // TODO: figure out how not repeat this two times
 table_name returns [TableName ret]
     :   t=identifier {$ret = factory.table(null, t); }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42cc41c9/phoenix-core/src/main/java/org/apache/phoenix/compile/CloseStatementCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CloseStatementCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CloseStatementCompiler.java
new file mode 100644
index 0000000..cc53a9d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CloseStatementCompiler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.parse.CloseStatement;
+import org.apache.phoenix.parse.OpenStatement;
+import org.apache.phoenix.schema.MetaDataClient;
+
+import java.sql.SQLException;
+import java.util.Collections;
+
+public class CloseStatementCompiler {
+    private final PhoenixStatement statement;
+    private final Operation operation;
+
+    public CloseStatementCompiler(PhoenixStatement statement, Operation operation) {
+        this.statement = statement;
+        this.operation = operation;
+    }
+
+    public MutationPlan compile(final CloseStatement close) throws SQLException {
+        final PhoenixConnection connection = statement.getConnection();
+        final StatementContext context = new StatementContext(statement);
+        final MetaDataClient client = new MetaDataClient(connection);
+        
+        return new BaseMutationPlan(context, operation) {
+            @Override
+            public MutationState execute() throws SQLException {
+                return client.close(close);
+            }
+
+            @Override
+            public ExplainPlan getExplainPlan() throws SQLException {
+                return new ExplainPlan(Collections.singletonList("CLOSE CURSOR"));
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42cc41c9/phoenix-core/src/main/java/org/apache/phoenix/compile/DeclareCursorCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeclareCursorCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeclareCursorCompiler.java
new file mode 100644
index 0000000..5280291
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeclareCursorCompiler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.parse.CreateIndexStatement;
+import org.apache.phoenix.parse.DeclareCursorStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.util.CursorUtil;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PTable.IndexType;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+public class DeclareCursorCompiler {
+    private final PhoenixStatement statement;
+    private final Operation operation;
+    private QueryPlan queryPlan;
+
+    public DeclareCursorCompiler(PhoenixStatement statement, Operation operation, QueryPlan queryPlan) {
+        this.statement = statement;
+        this.operation = operation;
+        this.queryPlan = queryPlan;
+    }
+
+    public MutationPlan compile(final DeclareCursorStatement declare) throws SQLException {
+        if(declare.getBindCount() != 0){
+            throw new SQLException("Cannot declare cursor, internal SELECT statement contains bindings!");
+        }
+
+        final PhoenixConnection connection = statement.getConnection();
+        final StatementContext context = new StatementContext(statement);
+        final MetaDataClient client = new MetaDataClient(connection);
+        
+        return new BaseMutationPlan(context, operation) {
+            @Override
+            public MutationState execute() throws SQLException {
+                return client.declareCursor(declare, queryPlan);
+            }
+
+            @Override
+            public ExplainPlan getExplainPlan() throws SQLException {
+                return new ExplainPlan(Collections.singletonList("DECLARE CURSOR"));
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42cc41c9/phoenix-core/src/main/java/org/apache/phoenix/compile/OpenStatementCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OpenStatementCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OpenStatementCompiler.java
new file mode 100644
index 0000000..b6125fd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OpenStatementCompiler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.parse.DeclareCursorStatement;
+import org.apache.phoenix.parse.OpenStatement;
+import org.apache.phoenix.schema.MetaDataClient;
+
+import java.sql.SQLException;
+import java.util.Collections;
+
+public class OpenStatementCompiler {
+    private final PhoenixStatement statement;
+    private final Operation operation;
+
+    public OpenStatementCompiler(PhoenixStatement statement, Operation operation) {
+        this.statement = statement;
+        this.operation = operation;
+    }
+
+    public MutationPlan compile(final OpenStatement open) throws SQLException {
+        final PhoenixConnection connection = statement.getConnection();
+        final StatementContext context = new StatementContext(statement);
+        final MetaDataClient client = new MetaDataClient(connection);
+        
+        return new BaseMutationPlan(context, operation) {
+            @Override
+            public MutationState execute() throws SQLException {
+                return client.open(open);
+            }
+
+            @Override
+            public ExplainPlan getExplainPlan() throws SQLException {
+                return new ExplainPlan(Collections.singletonList("OPEN CURSOR"));
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42cc41c9/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
new file mode 100644
index 0000000..aaea13e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
@@ -0,0 +1,53 @@
+package org.apache.phoenix.execute;
+
+import java.sql.SQLException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.iterate.CursorResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
+import org.apache.phoenix.iterate.ResultIterator;
+
+public class CursorFetchPlan extends DelegateQueryPlan {
+
+    private CursorResultIterator resultIterator;
+    private int fetchSize;
+    private boolean isAggregate;
+    private String cursorName;
+
+	public CursorFetchPlan(QueryPlan cursorQueryPlan,String cursorName) {
+		super(cursorQueryPlan);
+        this.isAggregate = delegate.getStatement().isAggregate() || delegate.getStatement().isDistinct();
+        this.cursorName = cursorName;
+	}
+
+	@Override
+	public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+		StatementContext context = delegate.getContext();
+		if (resultIterator == null) {
+			context.getOverallQueryMetrics().startQuery();
+			resultIterator = new CursorResultIterator(LookAheadResultIterator.wrap(delegate.iterator(scanGrouper, scan)),cursorName);
+		}
+	    return resultIterator;
+	}
+
+
+	@Override
+	public ExplainPlan getExplainPlan() throws SQLException {
+		return delegate.getExplainPlan();
+	}
+	
+	public void setFetchSize(int fetchSize){
+	    this.fetchSize = fetchSize;	
+	}
+
+	public int getFetchSize() {
+		return fetchSize;
+	}
+
+        public boolean isAggregate(){
+            return this.isAggregate;
+        }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42cc41c9/phoenix-core/src/main/java/org/apache/phoenix/iterate/CursorResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/CursorResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/CursorResultIterator.java
new file mode 100644
index 0000000..7ff2785
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/CursorResultIterator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.CursorUtil;
+
+import java.sql.SQLException;
+import java.util.List;
+
+public class CursorResultIterator implements ResultIterator {
+    private String cursorName;
+    private PeekingResultIterator delegate;
+    //TODO Configure fetch size from FETCH call
+    private int fetchSize = 0;
+    private int rowsRead = 0;
+    public CursorResultIterator(PeekingResultIterator delegate, String cursorName) {
+        this.delegate = delegate;
+        this.cursorName = cursorName;
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+    	if(!CursorUtil.moreValues(cursorName)){
+    	    return null;
+        } else if (fetchSize == rowsRead) {
+            return null;
+    	}
+
+        Tuple next = delegate.next();
+        CursorUtil.updateCursor(cursorName,next, delegate.peek());
+        rowsRead++;
+        return next;
+    }
+    
+    @Override
+    public void explain(List<String> planSteps) {
+        delegate.explain(planSteps);
+        planSteps.add("CLIENT CURSOR " + cursorName);
+    }
+
+    @Override
+    public String toString() {
+        return "CursorResultIterator [cursor=" + cursorName + "]";
+    }
+
+    @Override
+    public void close() throws SQLException {
+        //NOP
+    }
+
+    public void closeCursor() throws SQLException {
+        delegate.close();
+    }
+
+    public void setFetchSize(int fetchSize){
+        this.fetchSize = fetchSize;
+        this.rowsRead = 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42cc41c9/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 7239ff5..9a094ba 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.call.CallRunner;
 import org.apache.phoenix.compile.BaseMutationPlan;
+import org.apache.phoenix.compile.CloseStatementCompiler;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.CreateFunctionCompiler;
@@ -54,6 +55,7 @@ import org.apache.phoenix.compile.CreateIndexCompiler;
 import org.apache.phoenix.compile.CreateSchemaCompiler;
 import org.apache.phoenix.compile.CreateSequenceCompiler;
 import org.apache.phoenix.compile.CreateTableCompiler;
+import org.apache.phoenix.compile.DeclareCursorCompiler;
 import org.apache.phoenix.compile.DeleteCompiler;
 import org.apache.phoenix.compile.DropSequenceCompiler;
 import org.apache.phoenix.compile.ExplainPlan;
@@ -62,6 +64,7 @@ import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.ListJarsQueryPlan;
 import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.compile.OpenStatementCompiler;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.QueryCompiler;
 import org.apache.phoenix.compile.QueryPlan;
@@ -89,13 +92,16 @@ import org.apache.phoenix.parse.AddJarsStatement;
 import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.AlterIndexStatement;
 import org.apache.phoenix.parse.BindableStatement;
+import org.apache.phoenix.parse.CloseStatement;
 import org.apache.phoenix.parse.ColumnDef;
 import org.apache.phoenix.parse.ColumnName;
+import org.apache.phoenix.parse.CursorName;
 import org.apache.phoenix.parse.CreateFunctionStatement;
 import org.apache.phoenix.parse.CreateIndexStatement;
 import org.apache.phoenix.parse.CreateSchemaStatement;
 import org.apache.phoenix.parse.CreateSequenceStatement;
 import org.apache.phoenix.parse.CreateTableStatement;
+import org.apache.phoenix.parse.DeclareCursorStatement;
 import org.apache.phoenix.parse.DeleteJarStatement;
 import org.apache.phoenix.parse.DeleteStatement;
 import org.apache.phoenix.parse.DropColumnStatement;
@@ -105,6 +111,7 @@ import org.apache.phoenix.parse.DropSchemaStatement;
 import org.apache.phoenix.parse.DropSequenceStatement;
 import org.apache.phoenix.parse.DropTableStatement;
 import org.apache.phoenix.parse.ExecuteUpgradeStatement;
+import org.apache.phoenix.parse.FetchStatement;
 import org.apache.phoenix.parse.ExplainStatement;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode;
@@ -115,6 +122,7 @@ import org.apache.phoenix.parse.LiteralParseNode;
 import org.apache.phoenix.parse.NamedNode;
 import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.parse.OffsetNode;
+import org.apache.phoenix.parse.OpenStatement;
 import org.apache.phoenix.parse.OrderByNode;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.ParseNode;
@@ -153,6 +161,7 @@ import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.CursorUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PhoenixContextExecutor;
@@ -291,7 +300,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                         }
                         StatementContext context = plan.getContext();
                         context.getOverallQueryMetrics().startQuery();
-                        PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), context);
+                        PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), plan.getContext());
                         resultSets.add(rs);
                         setLastQueryPlan(plan);
                         setLastResultSet(rs);
@@ -400,6 +409,13 @@ public class PhoenixStatement implements Statement, SQLCloseable {
             super(from, hint, isDistinct, select, where, groupBy, having, orderBy, limit, offset, bindCount, isAggregate, hasSequence, selects, udfParseNodes);
         }
         
+        private ExecutableSelectStatement(ExecutableSelectStatement select) {
+            this(select.getFrom(), select.getHint(), select.isDistinct(), select.getSelect(), select.getWhere(),
+                    select.getGroupBy(), select.getHaving(), select.getOrderBy(), select.getLimit(), select.getOffset(), select.getBindCount(),
+                    select.isAggregate(), select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
+        }
+		
+        
         @SuppressWarnings("unchecked")
         @Override
         public QueryPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
@@ -414,6 +430,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                 resolver = FromCompiler.getResolverForQuery(transformedSelect, stmt.getConnection());
                 select = StatementNormalizer.normalize(transformedSelect, resolver);
             }
+
             QueryPlan plan = new QueryCompiler(stmt, select, resolver, Collections.<PDatum>emptyList(), stmt.getConnection().getIteratorFactory(), new SequenceManager(stmt), true).compile();
             plan.getContext().getSequenceManager().validateSequences(seqAction);
             return plan;
@@ -762,6 +779,56 @@ public class PhoenixStatement implements Statement, SQLCloseable {
         }
     }
 
+    private static class ExecutableDeclareCursorStatement extends DeclareCursorStatement implements CompilableStatement {
+        public ExecutableDeclareCursorStatement(CursorName cursor, SelectStatement select){
+            super(cursor, select);
+        }
+
+        @Override
+        public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
+            ExecutableSelectStatement wrappedSelect = new ExecutableSelectStatement(
+            		(ExecutableSelectStatement) stmt.parseStatement(this.getQuerySQL()));
+            DeclareCursorCompiler compiler = new DeclareCursorCompiler(stmt, this.getOperation(),wrappedSelect.compilePlan(stmt, seqAction));
+            return compiler.compile(this);
+        }
+    }
+
+    private static class ExecutableOpenStatement extends OpenStatement implements CompilableStatement {
+        public ExecutableOpenStatement(CursorName cursor){
+            super(cursor);
+        }
+
+        @Override
+        public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
+            OpenStatementCompiler compiler = new OpenStatementCompiler(stmt, this.getOperation());
+            return compiler.compile(this);
+        }
+    }
+
+    private static class ExecutableCloseStatement extends CloseStatement implements CompilableStatement {
+        public ExecutableCloseStatement(CursorName cursor){
+            super(cursor);
+        }
+
+        @Override
+        public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
+            CloseStatementCompiler compiler = new CloseStatementCompiler(stmt, this.getOperation());
+            return compiler.compile(this);
+        }
+    }
+
+    private static class ExecutableFetchStatement extends FetchStatement implements CompilableStatement {
+        public ExecutableFetchStatement(CursorName cursor, boolean isNext, int fetchLimit){
+            super(cursor, isNext, fetchLimit);
+        }
+
+        @Override
+        public QueryPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
+            return CursorUtil.getFetchPlan(this.getCursorName().getName(), this.isNext(), this.getFetchSize());
+        }
+
+    }
+
     private static class ExecutableDeleteJarStatement extends DeleteJarStatement implements CompilableStatement {
 
         public ExecutableDeleteJarStatement(LiteralParseNode jarPath) {
@@ -1162,7 +1229,27 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                 Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName,ParseNode>> onDupKeyPairs) {
             return new ExecutableUpsertStatement(table, hintNode, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs);
         }
-        
+
+        @Override
+        public ExecutableDeclareCursorStatement declareCursor(CursorName cursor, SelectStatement select){
+            return new ExecutableDeclareCursorStatement(cursor, select);
+        }
+
+        @Override
+        public ExecutableFetchStatement fetch(CursorName cursor, boolean isNext, int fetchLimit){
+            return new ExecutableFetchStatement(cursor, isNext, fetchLimit);
+        }
+
+        @Override
+        public ExecutableOpenStatement open(CursorName cursor){
+            return new ExecutableOpenStatement(cursor);
+        }
+
+        @Override
+        public ExecutableCloseStatement close(CursorName cursor){
+            return new ExecutableCloseStatement(cursor);
+        }
+
         @Override
         public ExecutableDeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode whereNode, List<OrderByNode> orderBy, LimitNode limit, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
             return new ExecutableDeleteStatement(table, hint, whereNode, orderBy, limit, bindCount, udfParseNodes);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42cc41c9/phoenix-core/src/main/java/org/apache/phoenix/parse/CloseStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CloseStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CloseStatement.java
new file mode 100644
index 0000000..5d7af34
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CloseStatement.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+
+public class CloseStatement implements BindableStatement {
+    private final CursorName cursorName;
+
+    public CloseStatement(CursorName cursorName){
+        this.cursorName = cursorName;
+    }
+
+    public String getCursorName(){
+        return cursorName.getName();
+    }
+
+    public int getBindCount(){
+        return 0;
+    }
+
+    public Operation getOperation(){
+        return Operation.UPSERT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42cc41c9/phoenix-core/src/main/java/org/apache/phoenix/parse/CursorName.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CursorName.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CursorName.java
new file mode 100644
index 0000000..5b9de76
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CursorName.java
@@ -0,0 +1,26 @@
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.util.SchemaUtil;
+
+public class CursorName {
+    private final String name;
+    private final boolean isCaseSensitive;
+
+    public CursorName(String name, boolean isCaseSensitive){
+        this.name = name;
+        this.isCaseSensitive = isCaseSensitive;
+    }
+
+    public CursorName(String name){
+        this.name = name;
+        this.isCaseSensitive = name == null ? false: SchemaUtil.isCaseSensitive(name);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public boolean isCaseSensitive() {
+        return isCaseSensitive;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42cc41c9/phoenix-core/src/main/java/org/apache/phoenix/parse/DeclareCursorStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeclareCursorStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeclareCursorStatement.java
new file mode 100644
index 0000000..68129ec
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeclareCursorStatement.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import java.util.*;
+
+public class DeclareCursorStatement implements BindableStatement {
+    private final CursorName cursorName;
+    private final SelectStatement select;
+
+    public DeclareCursorStatement(CursorName cursorName, SelectStatement select){
+        this.cursorName = cursorName;
+        this.select = select;
+    }
+
+    public String getCursorName(){
+        return cursorName.getName();
+    }
+
+    public String getQuerySQL(){
+        //Check if there are parameters to bind.
+        if(select.getBindCount() > 0){
+
+        }
+        //TODO: Test if this works
+        return select.toString();
+    }
+
+    public SelectStatement getSelect(){
+    	return select;
+    }
+
+    public List<OrderByNode> getSelectOrderBy() {
+        return select.getOrderBy();
+    }
+
+    public int getBindCount(){
+        return select.getBindCount();
+    }
+
+    public Operation getOperation(){
+        return Operation.UPSERT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42cc41c9/phoenix-core/src/main/java/org/apache/phoenix/parse/FetchStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/FetchStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/FetchStatement.java
new file mode 100644
index 0000000..08e9724
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/FetchStatement.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+
+public class FetchStatement implements BindableStatement {
+    private final CursorName cursorName;
+    private final boolean isNext;
+    private final int fetchSize;
+
+    public FetchStatement(CursorName cursorName, boolean isNext, int fetchSize){
+        this.cursorName = cursorName;
+        this.isNext = isNext;
+        this.fetchSize = fetchSize;
+    }
+
+    public CursorName getCursorName(){
+        return cursorName;
+    }
+
+    public boolean isNext(){
+        return isNext;
+    }
+
+    public int getBindCount(){
+        return 0;
+    }
+
+    public Operation getOperation(){
+        return Operation.QUERY;
+    }
+    
+    public int getFetchSize(){
+    	return fetchSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42cc41c9/phoenix-core/src/main/java/org/apache/phoenix/parse/OpenStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/OpenStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/OpenStatement.java
new file mode 100644
index 0000000..ad905b0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/OpenStatement.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+
+public class OpenStatement implements BindableStatement {
+    private final CursorName cursorName;
+
+    public OpenStatement(CursorName cursorName){
+        this.cursorName = cursorName;
+    }
+
+    public String getCursorName(){
+        return cursorName.getName();
+    }
+
+    public int getBindCount(){
+        return 0;
+    }
+
+    public Operation getOperation(){
+        return Operation.UPSERT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42cc41c9/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 0091f10..08aa0bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -727,6 +727,26 @@ public class ParseNodeFactory {
         return new UpsertStatement(table, hint, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs);
     }
 
+    public CursorName cursorName(String name){
+        return new CursorName(name);
+    }
+
+    public DeclareCursorStatement declareCursor(CursorName cursor, SelectStatement select){
+        return new DeclareCursorStatement(cursor, select);
+    }
+
+    public FetchStatement fetch(CursorName cursor, boolean isNext, int fetchLimit){
+        return new FetchStatement(cursor, isNext, fetchLimit);
+    }
+
+    public OpenStatement open(CursorName cursor){
+        return new OpenStatement(cursor);
+    }
+
+    public CloseStatement close(CursorName cursor){
+        return new CloseStatement(cursor);
+    }
+
     public DeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode node, List<OrderByNode> orderBy, LimitNode limit, int bindCount, Map<String, UDFParseNode> udfParseNodes) {
         return new DeleteStatement(table, hint, node, orderBy, limit, bindCount, udfParseNodes);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42cc41c9/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java
index 1a80991..b6b7de2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SQLParser.java
@@ -139,6 +139,82 @@ public class SQLParser {
     }
 
     /**
+     * Parses the input as a SQL declare cursor statement.
+     * Used only in tests
+     * @throws SQLException
+     */
+    public DeclareCursorStatement parseDeclareCursor() throws SQLException {
+        try {
+            DeclareCursorStatement statement = parser.declare_cursor_node();
+            return statement;
+        } catch (RecognitionException e) {
+            throw PhoenixParserException.newException(e, parser.getTokenNames());
+        } catch (RuntimeException e) {
+            if (e.getCause() instanceof SQLException) {
+                throw (SQLException) e.getCause();
+            }
+            throw PhoenixParserException.newException(e, parser.getTokenNames());
+        }
+    }
+
+    /**
+     * Parses the input as a SQL cursor open statement.
+     * Used only in tests
+     * @throws SQLException
+     */
+    public OpenStatement parseOpen() throws SQLException {
+        try {
+            OpenStatement statement = parser.cursor_open_node();
+            return statement;
+        } catch (RecognitionException e) {
+            throw PhoenixParserException.newException(e, parser.getTokenNames());
+        } catch (RuntimeException e) {
+            if (e.getCause() instanceof SQLException) {
+                throw (SQLException) e.getCause();
+            }
+            throw PhoenixParserException.newException(e, parser.getTokenNames());
+        }
+    }
+
+    /**
+     * Parses the input as a SQL cursor close statement.
+     * Used only in tests
+     * @throws SQLException
+     */
+    public CloseStatement parseClose() throws SQLException {
+        try {
+            CloseStatement statement = parser.cursor_close_node();
+            return statement;
+        } catch (RecognitionException e) {
+            throw PhoenixParserException.newException(e, parser.getTokenNames());
+        } catch (RuntimeException e) {
+            if (e.getCause() instanceof SQLException) {
+                throw (SQLException) e.getCause();
+            }
+            throw PhoenixParserException.newException(e, parser.getTokenNames());
+        }
+    }
+
+    /**
+     * Parses the input as a SQL cursor fetch statement.
+     * Used only in tests
+     * @throws SQLException
+     */
+    public FetchStatement parseFetch() throws SQLException {
+        try {
+            FetchStatement statement = parser.cursor_fetch_node();
+            return statement;
+        } catch (RecognitionException e) {
+            throw PhoenixParserException.newException(e, parser.getTokenNames());
+        } catch (RuntimeException e) {
+            if (e.getCause() instanceof SQLException) {
+                throw (SQLException) e.getCause();
+            }
+            throw PhoenixParserException.newException(e, parser.getTokenNames());
+        }
+    }
+
+    /**
      * Parses the input as a SQL select statement.
      * Used only in tests
      * @throws SQLException 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42cc41c9/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 042ab7f..fcb647d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -144,6 +144,7 @@ import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.compile.PostDDLCompiler;
 import org.apache.phoenix.compile.PostIndexDDLCompiler;
 import org.apache.phoenix.compile.PostLocalIndexDDLCompiler;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.StatementNormalizer;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -165,6 +166,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.AddColumnStatement;
 import org.apache.phoenix.parse.AlterIndexStatement;
+import org.apache.phoenix.parse.CloseStatement;
 import org.apache.phoenix.parse.ColumnDef;
 import org.apache.phoenix.parse.ColumnDefInPkConstraint;
 import org.apache.phoenix.parse.ColumnName;
@@ -173,6 +175,7 @@ import org.apache.phoenix.parse.CreateIndexStatement;
 import org.apache.phoenix.parse.CreateSchemaStatement;
 import org.apache.phoenix.parse.CreateSequenceStatement;
 import org.apache.phoenix.parse.CreateTableStatement;
+import org.apache.phoenix.parse.DeclareCursorStatement;
 import org.apache.phoenix.parse.DropColumnStatement;
 import org.apache.phoenix.parse.DropFunctionStatement;
 import org.apache.phoenix.parse.DropIndexStatement;
@@ -181,6 +184,7 @@ import org.apache.phoenix.parse.DropSequenceStatement;
 import org.apache.phoenix.parse.DropTableStatement;
 import org.apache.phoenix.parse.IndexKeyConstraint;
 import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.OpenStatement;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PFunction.FunctionArgument;
 import org.apache.phoenix.parse.PSchema;
@@ -214,6 +218,7 @@ import org.apache.phoenix.schema.types.PUnsignedLong;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.CursorUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
@@ -1365,6 +1370,21 @@ public class MetaDataClient {
         return fullName;
     }
 
+    public MutationState declareCursor(DeclareCursorStatement statement, QueryPlan queryPlan) throws SQLException {
+        CursorUtil.declareCursor(statement, queryPlan);
+        return new MutationState(0,connection);
+    }
+
+    public MutationState open(OpenStatement statement) throws SQLException {
+        CursorUtil.openCursor(statement, connection);
+        return new MutationState(0,connection);
+    }
+
+    public MutationState close(CloseStatement statement) throws SQLException {
+        CursorUtil.closeCursor(statement);
+        return new MutationState(0,connection);
+    }
+
     /**
      * Create an index table by morphing the CreateIndexStatement into a CreateTableStatement and calling
      * MetaDataClient.createTable. In doing so, we perform the following translations:


Mime
View raw message