phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anoopsamj...@apache.org
Subject git commit: PHOENIX-1000 Support FIRST_VALUE, LAST_VALUE, and NTH_VALUE aggregate functions. (Vaclav)
Date Wed, 02 Jul 2014 13:58:01 GMT
Repository: phoenix
Updated Branches:
  refs/heads/master 25adb4088 -> 858dd40b6


PHOENIX-1000 Support FIRST_VALUE, LAST_VALUE, and NTH_VALUE aggregate functions. (Vaclav)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/858dd40b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/858dd40b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/858dd40b

Branch: refs/heads/master
Commit: 858dd40b6c8c19652f0c4861c5ea173b8be326a2
Parents: 25adb40
Author: anoopsjohn <anoopsamjohn@gmail.com>
Authored: Wed Jul 2 19:27:20 2014 +0530
Committer: anoopsjohn <anoopsamjohn@gmail.com>
Committed: Wed Jul 2 19:27:20 2014 +0530

----------------------------------------------------------------------
 .../phoenix/end2end/FirstValueFunctionIT.java   | 196 +++++++++++
 .../phoenix/end2end/LastValueFunctionIT.java    | 326 +++++++++++++++++++
 .../phoenix/end2end/NthValueFunctionIT.java     | 148 +++++++++
 .../phoenix/expression/ExpressionType.java      |   7 +
 .../FirstLastValueBaseClientAggregator.java     | 141 ++++++++
 .../FirstLastValueServerAggregator.java         | 205 ++++++++++++
 .../function/FirstLastValueBaseFunction.java    |  62 ++++
 .../expression/function/FirstValueFunction.java |  68 ++++
 .../expression/function/LastValueFunction.java  |  71 ++++
 .../expression/function/NthValueFunction.java   |  78 +++++
 .../parse/FirstValueAggregateParseNode.java     |  38 +++
 .../parse/LastValueAggregateParseNode.java      |  38 +++
 .../parse/NthValueAggregateParseNode.java       |  38 +++
 .../apache/phoenix/parse/ParseNodeFactory.java  |  13 +-
 .../util/FirstLastNthValueDataContainer.java    | 168 ++++++++++
 15 files changed, 1589 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/858dd40b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FirstValueFunctionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FirstValueFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FirstValueFunctionIT.java
new file mode 100644
index 0000000..cd81b66
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FirstValueFunctionIT.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.*;
+import java.sql.*;
+import static org.apache.hadoop.hbase.util.VersionInfo.getUrl;
+import org.junit.Test;
+
+public class FirstValueFunctionIT extends BaseHBaseManagedTimeIT {
+
+    @Test
+    public void signedLongAsBigInt() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS first_value_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " date BIGINT, \"value\" BIGINT)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (1, 8, 1, 3)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (2, 8, 2, 7)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (3, 8, 3, 9)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (5, 8, 5, 158)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (4, 8, 4, 5)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT FIRST_VALUE(\"value\") WITHIN GROUP (ORDER BY date ASC) FROM first_value_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getLong(1), 3);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testSortOrderInSortCol() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS first_value_table "
+                + "(id INTEGER NOT NULL, page_id UNSIGNED_LONG NOT NULL,"
+                + " dates BIGINT NOT NULL, \"value\" BIGINT CONSTRAINT pk PRIMARY KEY (id, dates DESC))";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, dates, \"value\") VALUES (1, 8, 1, 3)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, dates, \"value\") VALUES (2, 8, 2, 7)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, dates, \"value\") VALUES (3, 8, 3, 9)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, dates, \"value\") VALUES (5, 8, 5, 158)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, dates, \"value\") VALUES (4, 8, 4, 5)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT FIRST_VALUE(\"value\") WITHIN GROUP (ORDER BY dates ASC) FROM first_value_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getLong(1), 3);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testSortOrderInDataCol() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS first_value_table "
+                + "(id INTEGER NOT NULL, page_id UNSIGNED_LONG NOT NULL,"
+                + " dates BIGINT NOT NULL, \"value\" BIGINT NOT NULL CONSTRAINT pk PRIMARY KEY (id, dates, \"value\" DESC))";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, dates, \"value\") VALUES (1, 8, 1, 3)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, dates, \"value\") VALUES (2, 8, 2, 7)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, dates, \"value\") VALUES (3, 8, 3, 9)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, dates, \"value\") VALUES (5, 8, 5, 158)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, dates, \"value\") VALUES (4, 8, 4, 5)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT FIRST_VALUE(\"value\") WITHIN GROUP (ORDER BY dates ASC) FROM first_value_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getLong(1), 3);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void doubleDataType() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS first_value_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG, "
+                + "date DOUBLE, \"value\" DOUBLE)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (1, 8, 1, 300)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (2, 8, 2, 7)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (3, 8, 3, 9)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (5, 8, 4, 2)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (4, 8, 5, 400)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT FIRST_VALUE(\"value\") WITHIN GROUP (ORDER BY date ASC) FROM first_value_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals("doubles", rs.getDouble(1), 300, 0.00001);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void varcharFixedLenghtDatatype() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS first_value_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG, "
+                + "date VARCHAR(3), \"value\" VARCHAR(3))";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (1, 8, '1', '3')");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (2, 8, '2', '7')");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (3, 8, '3', '9')");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (5, 8, '4', '2')");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (4, 8, '5', '4')");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT FIRST_VALUE(\"value\") WITHIN GROUP (ORDER BY date ASC) FROM first_value_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getString(1), "3");
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void floatDataType() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS first_value_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " date FLOAT, \"value\" FLOAT)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (1, 8, 1, 300)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (2, 8, 2, 7)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (3, 8, 3, 9)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (5, 8, 4, 2)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id, date, \"value\") VALUES (4, 8, 5, 400)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT FIRST_VALUE(\"value\") WITHIN GROUP (ORDER BY date ASC) FROM first_value_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getFloat(1), 300.0, 0.000001);
+        assertFalse(rs.next());
+
+    }
+
+    @Test
+    public void allColumnsNull() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS first_value_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " date FLOAT, \"value\" FLOAT)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id) VALUES (1, 8)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id) VALUES (2, 8)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id) VALUES (3, 8)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id) VALUES (5, 8)");
+        conn.createStatement().execute("UPSERT INTO first_value_table (id, page_id) VALUES (4, 8)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT FIRST_VALUE(\"value\") WITHIN GROUP (ORDER BY date ASC) FROM first_value_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        byte[] nothing = rs.getBytes(1);
+        assertTrue(nothing == null);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/858dd40b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LastValueFunctionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/LastValueFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LastValueFunctionIT.java
new file mode 100644
index 0000000..4886b4d
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/LastValueFunctionIT.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.*;
+import java.sql.*;
+import static org.apache.hadoop.hbase.util.VersionInfo.getUrl;
+import org.junit.Test;
+
+public class LastValueFunctionIT extends BaseHBaseManagedTimeIT {
+
+    @Test
+    public void unsignedLong() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS last_value_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " date DATE, \"value\" UNSIGNED_LONG)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") "
+                + "VALUES (1, 8, TO_DATE('2013-01-01 00:00:00'), 300)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") "
+                + "VALUES (2, 8, TO_DATE('2013-01-01 00:01:00'), 7)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") "
+                + "VALUES (3, 8, TO_DATE('2013-01-01 00:02:00'), 9)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") "
+                + "VALUES (4, 8, TO_DATE('2013-01-01 00:03:00'), 4)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") "
+                + "VALUES (5, 8, TO_DATE('2013-01-01 00:04:00'), 2)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") "
+                + "VALUES (6, 8, TO_DATE('2013-01-01 00:05:00'), 150)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT LAST_VALUE(\"value\") WITHIN GROUP (ORDER BY date ASC) FROM last_value_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getLong(1), 150);
+
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void signedInteger() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS last_test_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG, date INTEGER, \"value\" INTEGER)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO last_test_table (id, page_id, date, \"value\") VALUES (5, 8, 5, -255)");
+        conn.createStatement().execute("UPSERT INTO last_test_table (id, page_id, date, \"value\") VALUES (1, 8, 1, 3)");
+        conn.createStatement().execute("UPSERT INTO last_test_table (id, page_id, date, \"value\") VALUES (2, 8, 2, 7)");
+        conn.createStatement().execute("UPSERT INTO last_test_table (id, page_id, date, \"value\") VALUES (3, 8, 3, 9)");
+        conn.createStatement().execute("UPSERT INTO last_test_table (id, page_id, date, \"value\") VALUES (4, 8, 4, 4)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT LAST_VALUE(\"value\") WITHIN GROUP (ORDER BY date ASC) FROM last_test_table GROUP BY page_id"
+        );
+
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), -255);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void unsignedInteger() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS last_test_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " date UNSIGNED_INT, \"value\" UNSIGNED_INT)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO last_test_table (id, page_id, date, \"value\") VALUES (1, 8, 1, 3)");
+        conn.createStatement().execute("UPSERT INTO last_test_table (id, page_id, date, \"value\") VALUES (2, 8, 2, 7)");
+        conn.createStatement().execute("UPSERT INTO last_test_table (id, page_id, date, \"value\") VALUES (3, 8, 3, 9)");
+        conn.createStatement().execute("UPSERT INTO last_test_table (id, page_id, date, \"value\") VALUES (5, 8, 4, 2)");
+        conn.createStatement().execute("UPSERT INTO last_test_table (id, page_id, date, \"value\") VALUES (4, 8, 5, 4)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT LAST_VALUE(\"value\") WITHIN GROUP (ORDER BY date ASC) FROM last_test_table GROUP BY page_id"
+        );
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 4);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void simpleTestDescOrder() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS last_value_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " dates INTEGER, val INTEGER)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, dates, val) VALUES (1, 8, 0, 300)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, dates, val) VALUES (2, 8, 1, 7)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, dates, val) VALUES (3, 8, 2, 9)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, dates, val) VALUES (4, 8, 3, 4)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, dates, val) VALUES (5, 8, 4, 2)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, dates, val) VALUES (6, 8, 5, 150)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT LAST_VALUE(val) WITHIN GROUP (ORDER BY dates DESC) FROM last_value_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 300);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void simpleTestAscOrder() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS last_value_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " dates INTEGER, val INTEGER)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, dates, val) VALUES (1, 8, 0, 300)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, dates, val) VALUES (2, 8, 1, 7)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, dates, val) VALUES (3, 8, 2, 9)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, dates, val) VALUES (4, 8, 3, 4)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, dates, val) VALUES (5, 8, 4, 2)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, dates, val) VALUES (6, 8, 5, 150)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT LAST_VALUE(val) WITHIN GROUP (ORDER BY dates ASC) FROM last_value_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 150);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void charDatatype() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS last_value_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG, "
+                + "date CHAR(3), \"value\" CHAR(3))";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (1, 8, '1', '300')");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (2, 8, '2', '7')");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (3, 8, '3', '9')");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (5, 8, '4', '2')");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (4, 8, '5', '400')");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT LAST_VALUE(\"value\") WITHIN GROUP (ORDER BY date ASC) FROM last_value_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getString(1), "400");
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void varcharVariableLenghtDatatype() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS last_value_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " date VARCHAR, \"value\" VARCHAR)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (1, 8, '1', '3')");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (2, 8, '2', '7')");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (3, 8, '3', '9')");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (5, 8, '4', '2')");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (4, 8, '5', '4')");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT LAST_VALUE(\"value\") WITHIN GROUP (ORDER BY date ASC) FROM last_value_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getString(1), "4");
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void groupMultipleValues() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS last_value_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " date UNSIGNED_INT, \"value\" UNSIGNED_INT)";
+        conn.createStatement().execute(ddl);
+
+        //first page_id
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (1, 8, 1, 3)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (2, 8, 2, 7)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (3, 8, 3, 9)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (5, 8, 4, 2)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (4, 8, 5, 4)");
+
+        //second page_id
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (11, 9, 1, 3)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (12, 9, 2, 7)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (13, 9, 3, 9)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (15, 9, 4, 2)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (14, 9, 5, 40)");
+
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT LAST_VALUE(\"value\") WITHIN GROUP (ORDER BY date ASC) FROM last_value_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 4);
+
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 40);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void nullValuesInAggregatingColumns() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS last_value_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " date UNSIGNED_INT, \"value\" UNSIGNED_INT)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date) VALUES (1, 8, 1)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date) VALUES (2, 8, 2)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date) VALUES (3, 8, 3)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date) VALUES (5, 8, 4)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date) VALUES (4, 8, 5)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT LAST_VALUE(\"value\") WITHIN GROUP (ORDER BY date ASC) FROM last_value_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        byte[] nothing = rs.getBytes(1);
+        assertTrue(nothing == null);
+    }
+
+    @Test
+    public void nullValuesInAggregatingColumnsSecond() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS last_value_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " date UNSIGNED_INT, \"value\" UNSIGNED_INT)";
+        conn.createStatement().execute(ddl);
+
+        //first page_id
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date) VALUES (1, 8, 1)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date) VALUES (2, 8, 2)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date) VALUES (3, 8, 3)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date) VALUES (5, 8, 4)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date) VALUES (4, 8, 5)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT LAST_VALUE(\"value\") WITHIN GROUP (ORDER BY date ASC) FROM last_value_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        byte[] nothing = rs.getBytes(1);
+        assertTrue(nothing == null);
+    }
+
+    @Test
+    public void inOrderByClausule() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS last_value_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_INT,"
+                + " date UNSIGNED_INT, \"value\" UNSIGNED_INT)";
+        conn.createStatement().execute(ddl);
+
+        //first page
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (1, 8, 1, 3)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (2, 8, 2, 7)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (3, 8, 3, 9)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (5, 8, 4, 2)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (4, 8, 5, 5)");
+
+        //second page
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (5, 2, 1, 3)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (6, 2, 2, 7)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (7, 2, 3, 9)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (8, 2, 4, 2)");
+        conn.createStatement().execute("UPSERT INTO last_value_table (id, page_id, date, \"value\") VALUES (9, 2, 5, 4)");
+
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT LAST_VALUE(\"value\") WITHIN GROUP (ORDER BY date ASC) AS val "
+                + "FROM last_value_table GROUP BY page_id ORDER BY val DESC");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 5);
+
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 4);
+        assertFalse(rs.next());
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/858dd40b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java
new file mode 100644
index 0000000..61704b0
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NthValueFunctionIT.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.*;
+import java.sql.*;
+import static org.apache.hadoop.hbase.util.VersionInfo.getUrl;
+import org.junit.Test;
+
+public class NthValueFunctionIT extends BaseHBaseManagedTimeIT {
+
+    @Test
+    public void simpleTest() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nthValue "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " dates INTEGER, val INTEGER)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (2, 8, 1, 7)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (3, 8, 2, 9)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (4, 8, 3, 4)");
+        conn.createStatement().execute("UPSERT INTO nthValue (id, page_id, dates, val) VALUES (5, 8, 4, 2)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery("SELECT NTH_VALUE(val, 2) WITHIN GROUP (ORDER BY dates DESC) FROM nthValue GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 4);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void offsetValueAscOrder() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nth_test_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " date INTEGER, \"value\" UNSIGNED_LONG)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (1, 8, 0, 300)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (2, 8, 1, 7)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (3, 8, 2, 9)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (4, 8, 3, 4)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (5, 8, 4, 2)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (6, 8, 5, 150)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT NTH_VALUE(\"value\", 2)  WITHIN GROUP (ORDER BY date ASC) FROM nth_test_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getLong(1), 7);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void offsetValueDescOrder() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nth_test_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " date INTEGER, \"value\" UNSIGNED_LONG)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (1, 8, 0, 300)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (2, 8, 1, 7)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (3, 8, 2, 9)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (4, 8, 3, 4)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (5, 8, 4, 2)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (6, 8, 5, 150)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT NTH_VALUE(\"value\", 2)  WITHIN GROUP (ORDER BY date DESC) FROM nth_test_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getLong(1), 2);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void offsetValueLastMismatchByColumn() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nth_test_table "
+                + "(id INTEGER NOT NULL PRIMARY KEY, page_id UNSIGNED_LONG,"
+                + " date INTEGER, \"value\" UNSIGNED_LONG)";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (1, 8, 5, 8)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (2, 8, 2, 7)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (3, 8, 1, 9)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (4, 8, 4, 4)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (5, 8, 3, 2)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, date, \"value\") VALUES (6, 8, 0, 1)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT NTH_VALUE(\"value\", 2)  WITHIN GROUP (ORDER BY date DESC) FROM nth_test_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getLong(1), 4);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testSortOrderInDataColWithOffset() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+
+        String ddl = "CREATE TABLE IF NOT EXISTS nth_test_table "
+                + "(id INTEGER NOT NULL, page_id UNSIGNED_LONG NOT NULL,"
+                + " dates BIGINT NOT NULL, \"value\" BIGINT NOT NULL CONSTRAINT pk PRIMARY KEY (id, dates, \"value\" DESC))";
+        conn.createStatement().execute(ddl);
+
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (1, 8, 1, 3)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (2, 8, 2, 7)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (3, 8, 3, 9)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (5, 8, 5, 158)");
+        conn.createStatement().execute("UPSERT INTO nth_test_table (id, page_id, dates, \"value\") VALUES (4, 8, 4, 5)");
+        conn.commit();
+
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT NTH_VALUE(\"value\", 2)  WITHIN GROUP (ORDER BY dates ASC) FROM nth_test_table GROUP BY page_id");
+
+        assertTrue(rs.next());
+        assertEquals(rs.getLong(1), 7);
+        assertFalse(rs.next());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/858dd40b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
index 9e55610..b1fd83b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
@@ -68,6 +68,9 @@ import org.apache.phoenix.expression.function.ToNumberFunction;
 import org.apache.phoenix.expression.function.TrimFunction;
 import org.apache.phoenix.expression.function.TruncFunction;
 import org.apache.phoenix.expression.function.UpperFunction;
+import org.apache.phoenix.expression.function.NthValueFunction;
+import org.apache.phoenix.expression.function.FirstValueFunction;
+import org.apache.phoenix.expression.function.LastValueFunction;
 import org.apache.phoenix.expression.function.ConvertTimezoneFunction;
 
 import com.google.common.collect.Maps;
@@ -79,6 +82,7 @@ import com.google.common.collect.Maps;
  * and server.
  *  
  *
+ *
  * @since 0.1
  */
 public enum ExpressionType {
@@ -159,6 +163,9 @@ public enum ExpressionType {
     ArrayConstructorExpression(ArrayConstructorExpression.class),
     SQLViewTypeFunction(SQLViewTypeFunction.class),
     ExternalSqlTypeIdFunction(ExternalSqlTypeIdFunction.class),
+    NthValueFunction(NthValueFunction.class),
+    FirstValueFunction(FirstValueFunction.class),
+    LastValueFunction(LastValueFunction.class),
     ConvertTimezoneFunction(ConvertTimezoneFunction.class),
     DecodeFunction(DecodeFunction.class),
     TimezoneOffsetFunction(TimezoneOffsetFunction.class),

http://git-wip-us.apache.org/repos/asf/phoenix/blob/858dd40b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java
new file mode 100644
index 0000000..cf6b0f9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueBaseClientAggregator.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.util.FirstLastNthValueDataContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base client aggregator for (FIRST|LAST|NTH)_VALUE functions
+ *
+ */
+public class FirstLastValueBaseClientAggregator extends BaseAggregator {
+
+    private static final Logger logger = LoggerFactory.getLogger(FirstLastValueBaseClientAggregator.class);
+    protected boolean useOffset = false;
+    protected int offset = -1;
+    protected BinaryComparator topOrder = new BinaryComparator(ByteUtil.EMPTY_BYTE_ARRAY);
+    protected byte[] topValue = null;
+    protected TreeMap<byte[], byte[]> topValues = new TreeMap<byte[], byte[]>(new ByteArrayComparator());
+    protected boolean isAscending;
+
+    public FirstLastValueBaseClientAggregator() {
+        super(SortOrder.getDefault());
+    }
+
+    @Override
+    public void reset() {
+        topOrder = new BinaryComparator(ByteUtil.EMPTY_BYTE_ARRAY);
+        topValue = null;
+        topValues.clear();
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (useOffset) {
+            if (topValues.size() == 0) {
+                return false;
+            }
+
+            Set<Map.Entry<byte[], byte[]>> entrySet;
+            if (isAscending) {
+                entrySet = topValues.entrySet();
+            } else {
+                entrySet = topValues.descendingMap().entrySet();
+            }
+
+            int counter = offset;
+            for (Map.Entry<byte[], byte[]> entry : entrySet) {
+                if (--counter == 0) {
+                    ptr.set(entry.getValue());
+                    return true;
+                }
+            }
+
+            //not enought values to return Nth
+            return false;
+        }
+
+        if (topValue == null) {
+            return false;
+        }
+
+        ptr.set(topValue);
+        return true;
+    }
+
+    @Override
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+
+        //if is called cause aggregation in ORDER BY clausule
+        if (tuple instanceof SingleKeyValueTuple) {
+            topValue = ptr.copyBytes();
+            return;
+        }
+
+        FirstLastNthValueDataContainer payload = new FirstLastNthValueDataContainer();
+
+        payload.setPayload(ptr.copyBytes());
+        isAscending = payload.getIsAscending();
+        TreeMap serverAggregatorResult = payload.getData();
+
+        if (useOffset) {
+            payload.setOffset(offset);
+            topValues.putAll(serverAggregatorResult);
+        } else {
+            Entry<byte[], byte[]> valueEntry = serverAggregatorResult.firstEntry();
+            byte[] currentOrder = valueEntry.getKey();
+
+            boolean isBetter;
+            if (isAscending) {
+                isBetter = topOrder.compareTo(currentOrder) > 0;
+            } else {
+                isBetter = topOrder.compareTo(currentOrder) < 0; //desc
+            }
+            if (topOrder.getValue().length < 1 || isBetter) {
+                topOrder = new BinaryComparator(currentOrder);
+                topValue = valueEntry.getValue();
+            }
+        }
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return PDataType.VARBINARY;
+    }
+
+    public void init(int offset) {
+        if (offset != 0) {
+            useOffset = true;
+            this.offset = offset;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/858dd40b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java
new file mode 100644
index 0000000..90b7826
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/FirstLastValueServerAggregator.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SizedUtil;
+import java.io.IOException;
+import java.util.List;
+import java.util.TreeMap;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.util.FirstLastNthValueDataContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base server aggregator for (FIRST|LAST|NTH)_VALUE functions
+ *
+ */
+public class FirstLastValueServerAggregator extends BaseAggregator {
+
+    private static final Logger logger = LoggerFactory.getLogger(FirstLastValueServerAggregator.class);
+    protected List<Expression> children;
+    protected BinaryComparator topOrder = new BinaryComparator(ByteUtil.EMPTY_BYTE_ARRAY);
+    protected byte[] topValue;
+    protected boolean useOffset = false;
+    protected int offset = -1;
+    protected TreeMap<byte[], byte[]> topValues = new TreeMap<byte[], byte[]>(new Bytes.ByteArrayComparator());
+    protected boolean isAscending;
+    protected boolean hasValueDescSortOrder;
+    protected Expression orderByColumn;
+    protected Expression dataColumn;
+
+    public FirstLastValueServerAggregator() {
+        super(SortOrder.getDefault());
+    }
+
+    @Override
+    public void reset() {
+        topOrder = new BinaryComparator(ByteUtil.EMPTY_BYTE_ARRAY);
+        topValue = null;
+        topValues.clear();
+        offset = -1;
+        useOffset = false;
+    }
+
+    @Override
+    public int getSize() {
+        return super.getSize() + SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE;
+    }
+
+    @Override
+    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+        //set pointer to ordering by field
+        orderByColumn.evaluate(tuple, ptr);
+        byte[] currentOrder = ptr.copyBytes();
+
+        if (!dataColumn.evaluate(tuple, ptr)) {
+            return;
+        }
+
+        if (useOffset) {
+            boolean addFlag = false;
+            if (topValues.size() < offset) {
+                try {
+                    addFlag = true;
+                } catch (Exception e) {
+                    logger.error(e.getMessage());
+                }
+            } else {
+                if (isAscending) {
+                    byte[] lowestKey = topValues.lastKey();
+                    if (Bytes.compareTo(currentOrder, lowestKey) < 0) {
+                        topValues.remove(lowestKey);
+                        addFlag = true;
+                    }
+                } else { //desc
+                    byte[] highestKey = topValues.firstKey();
+                    if (Bytes.compareTo(currentOrder, highestKey) > 0) {
+                        topValues.remove(highestKey);
+                        addFlag = true;
+                    }
+                }
+            }
+            if (addFlag) {
+                //invert bytes if is SortOrder set
+                if (hasValueDescSortOrder) {
+                    topValues.put(currentOrder, SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength()));
+                } else {
+                    topValues.put(currentOrder, ptr.copyBytes());
+                }
+            }
+        } else {
+            boolean isHigher;
+            if (isAscending) {
+                isHigher = topOrder.compareTo(currentOrder) > 0;
+            } else {
+                isHigher = topOrder.compareTo(currentOrder) < 0;//desc
+            }
+            if (topOrder.getValue().length < 1 || isHigher) {
+                if (hasValueDescSortOrder) {
+                    topValue = SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength());
+                } else {
+                    topValue = ptr.copyBytes();
+                }
+
+                topOrder = new BinaryComparator(currentOrder);
+            }
+        }
+
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder out = new StringBuilder("FirstLastValueServerAggregator"
+                + " is ascending: " + isAscending + " value=");
+        if (useOffset) {
+            for (byte[] key : topValues.keySet()) {
+                out.append(topValues.get(key));
+            }
+            out.append(" offset = ").append(offset);
+        } else {
+            out.append(topValue);
+        }
+
+        return out.toString();
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+
+        FirstLastNthValueDataContainer payload = new FirstLastNthValueDataContainer();
+        payload.setIsAscending(isAscending);
+
+        payload.setFixedWidthOrderValues(orderByColumn.getDataType().isFixedWidth());
+        payload.setFixedWidthDataValues(dataColumn.getDataType().isFixedWidth());
+
+        if (useOffset) {
+            payload.setOffset(offset);
+
+            if (topValues.size() == 0) {
+                return false;
+            }
+        } else {
+            if (topValue == null) {
+                return false;
+            }
+            topValues.put(topOrder.getValue(), topValue);
+        }
+        payload.setData(topValues);
+
+        try {
+            ptr.set(payload.getPayload());
+        } catch (IOException ex) {
+            logger.error(ex.getMessage());
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return PDataType.VARBINARY;
+    }
+
+    public void init(List<Expression> children, boolean isAscending, int offset) {
+        this.children = children;
+        this.offset = offset;
+        if (offset > 0) {
+            useOffset = true;
+        }
+
+        orderByColumn = children.get(0);
+        dataColumn = children.get(2);
+
+        //set order if modified
+        hasValueDescSortOrder = (dataColumn.getSortOrder() == SortOrder.DESC);
+
+        if (orderByColumn.getSortOrder() == SortOrder.DESC) {
+            this.isAscending = !isAscending;
+        } else {
+            this.isAscending = isAscending;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/858dd40b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FirstLastValueBaseFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FirstLastValueBaseFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FirstLastValueBaseFunction.java
new file mode 100644
index 0000000..0c14c18
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FirstLastValueBaseFunction.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * (FIRST|LAST|NTH)_VALUE build in function interface
+ *
+ */
+abstract public class FirstLastValueBaseFunction extends DelegateConstantToCountAggregateFunction {
+
+    public static String NAME = null;
+
+    public FirstLastValueBaseFunction() {
+    }
+
+    public FirstLastValueBaseFunction(List<Expression> childExpressions, CountAggregateFunction delegate) {
+        super(childExpressions, delegate);
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        boolean wasEvaluated = super.evaluate(tuple, ptr);
+        if (!wasEvaluated) {
+            return false;
+        }
+        if (isConstantExpression()) {
+            getAggregatorExpression().evaluate(tuple, ptr);
+        }
+        return true;
+    }
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public PDataType getDataType() {
+        return children.get(2).getDataType();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/858dd40b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FirstValueFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FirstValueFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FirstValueFunction.java
new file mode 100644
index 0000000..ef01534
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FirstValueFunction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.FirstLastValueBaseClientAggregator;
+import org.apache.phoenix.expression.aggregator.FirstLastValueServerAggregator;
+import org.apache.phoenix.parse.FirstValueAggregateParseNode;
+import org.apache.phoenix.parse.FunctionParseNode;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ * Built-in function for FIRST_VALUE(<expression>) WITHIN GROUP (ORDER BY <expression> ASC/DESC) aggregate
+ * function
+ *
+ */
+@FunctionParseNode.BuiltInFunction(name = FirstValueFunction.NAME, nodeClass = FirstValueAggregateParseNode.class, args = {
+    @FunctionParseNode.Argument(),
+    @FunctionParseNode.Argument(allowedTypes = {PDataType.BOOLEAN}, isConstant = true),
+    @FunctionParseNode.Argument()})
+public class FirstValueFunction extends FirstLastValueBaseFunction {
+
+    public static final String NAME = "FIRST_VALUE";
+
+    public FirstValueFunction() {
+    }
+
+    public FirstValueFunction(List<Expression> childExpressions, CountAggregateFunction delegate) {
+        super(childExpressions, delegate);
+    }
+
+    @Override
+    public Aggregator newServerAggregator(Configuration conf) {
+        FirstLastValueServerAggregator aggregator = new FirstLastValueServerAggregator();
+
+        boolean order = (Boolean) ((LiteralExpression) children.get(1)).getValue();
+        aggregator.init(children, order, 0);
+
+        return aggregator;
+    }
+
+    @Override
+    public Aggregator newClientAggregator() {
+        FirstLastValueBaseClientAggregator aggregator = new FirstLastValueBaseClientAggregator();
+        aggregator.init(0);
+
+        return aggregator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/858dd40b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/LastValueFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/LastValueFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/LastValueFunction.java
new file mode 100644
index 0000000..9c5f133
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/LastValueFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.FirstLastValueBaseClientAggregator;
+import org.apache.phoenix.expression.aggregator.FirstLastValueServerAggregator;
+import org.apache.phoenix.parse.FunctionParseNode;
+import org.apache.phoenix.parse.LastValueAggregateParseNode;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ * Built-in function for LAST_VALUE(<expression>) WITHIN GROUP (ORDER BY <expression> ASC/DESC) aggregate
+ * function
+ *
+ */
+@FunctionParseNode.BuiltInFunction(name = LastValueFunction.NAME, nodeClass = LastValueAggregateParseNode.class, args = {
+    @FunctionParseNode.Argument(),
+    @FunctionParseNode.Argument(allowedTypes = {PDataType.BOOLEAN}, isConstant = true),
+    @FunctionParseNode.Argument()})
+public class LastValueFunction extends FirstLastValueBaseFunction {
+
+    public static final String NAME = "LAST_VALUE";
+
+    public LastValueFunction() {
+    }
+
+    public LastValueFunction(List<Expression> childExpressions, CountAggregateFunction delegate) {
+        super(childExpressions, delegate);
+    }
+
+    @Override
+    public Aggregator newServerAggregator(Configuration conf) {
+        FirstLastValueServerAggregator aggregator = new FirstLastValueServerAggregator();
+
+        //invert order for LAST_BY function cause it is inverted version of FIRST_BY
+        boolean order = !(Boolean) ((LiteralExpression) children.get(1)).getValue();
+        aggregator.init(children, order, 0);
+
+        return aggregator;
+    }
+
+    @Override
+    public Aggregator newClientAggregator() {
+
+        FirstLastValueBaseClientAggregator aggregator = new FirstLastValueBaseClientAggregator();
+        aggregator.init(0);
+
+        return aggregator;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/858dd40b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/NthValueFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/NthValueFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/NthValueFunction.java
new file mode 100644
index 0000000..5614088
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/NthValueFunction.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.FirstLastValueBaseClientAggregator;
+import org.apache.phoenix.expression.aggregator.FirstLastValueServerAggregator;
+import org.apache.phoenix.parse.NthValueAggregateParseNode;
+import org.apache.phoenix.parse.FunctionParseNode;
+import org.apache.phoenix.schema.PDataType;
+
+/**
+ * Built-in function for NTH_VALUE(<expression>, <expression>) WITHIN GROUP (ORDER BY <expression> ASC/DESC)
+ * aggregate function
+ *
+ */
+@FunctionParseNode.BuiltInFunction(name = NthValueFunction.NAME, nodeClass = NthValueAggregateParseNode.class, args = {
+    @FunctionParseNode.Argument(),
+    @FunctionParseNode.Argument(allowedTypes = {PDataType.BOOLEAN}, isConstant = true),
+    @FunctionParseNode.Argument(),
+    @FunctionParseNode.Argument(allowedTypes = {PDataType.INTEGER}, isConstant = true)})
+public class NthValueFunction extends FirstLastValueBaseFunction {
+
+    public static final String NAME = "NTH_VALUE";
+    private int offset;
+
+    public NthValueFunction() {
+    }
+
+    public NthValueFunction(List<Expression> childExpressions, CountAggregateFunction delegate) {
+        super(childExpressions, delegate);
+    }
+
+    @Override
+    public Aggregator newServerAggregator(Configuration conf) {
+        FirstLastValueServerAggregator aggregator = new FirstLastValueServerAggregator();
+
+        offset = ((Number) ((LiteralExpression) children.get(3)).getValue()).intValue();
+        boolean order = (Boolean) ((LiteralExpression) children.get(1)).getValue();
+
+        aggregator.init(children, order, offset);
+
+        return aggregator;
+    }
+
+    @Override
+    public Aggregator newClientAggregator() {
+        FirstLastValueBaseClientAggregator aggregator = new FirstLastValueBaseClientAggregator();
+
+        if (children.size() < 3) {
+            aggregator.init(offset);
+        } else {
+            aggregator.init(((Number) ((LiteralExpression) children.get(3)).getValue()).intValue());
+        }
+
+        return aggregator;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/858dd40b/phoenix-core/src/main/java/org/apache/phoenix/parse/FirstValueAggregateParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/FirstValueAggregateParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/FirstValueAggregateParseNode.java
new file mode 100644
index 0000000..6eeaf3e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/FirstValueAggregateParseNode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.FirstValueFunction;
+import org.apache.phoenix.expression.function.FunctionExpression;
+
+public class FirstValueAggregateParseNode extends DelegateConstantToCountParseNode {
+
+    public FirstValueAggregateParseNode(String name, List<ParseNode> children, FunctionParseNode.BuiltInFunctionInfo info) {
+        super(name, children, info);
+    }
+
+    @Override
+    public FunctionExpression create(List<Expression> children, StatementContext context) throws SQLException {
+        return new FirstValueFunction(children, getDelegateFunction(children, context));
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/858dd40b/phoenix-core/src/main/java/org/apache/phoenix/parse/LastValueAggregateParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/LastValueAggregateParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/LastValueAggregateParseNode.java
new file mode 100644
index 0000000..333bb13
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/LastValueAggregateParseNode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.expression.function.LastValueFunction;
+
+public class LastValueAggregateParseNode extends DelegateConstantToCountParseNode {
+
+    public LastValueAggregateParseNode(String name, List<ParseNode> children, FunctionParseNode.BuiltInFunctionInfo info) {
+        super(name, children, info);
+    }
+
+    @Override
+    public FunctionExpression create(List<Expression> children, StatementContext context) throws SQLException {
+        return new LastValueFunction(children, getDelegateFunction(children, context));
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/858dd40b/phoenix-core/src/main/java/org/apache/phoenix/parse/NthValueAggregateParseNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/NthValueAggregateParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/NthValueAggregateParseNode.java
new file mode 100644
index 0000000..a0495d4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/NthValueAggregateParseNode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import org.apache.phoenix.expression.function.NthValueFunction;
+
+public class NthValueAggregateParseNode extends DelegateConstantToCountParseNode {
+
+    public NthValueAggregateParseNode(String name, List<ParseNode> children, FunctionParseNode.BuiltInFunctionInfo info) {
+        super(name, children, info);
+    }
+
+    @Override
+    public FunctionExpression create(List<Expression> children, StatementContext context) throws SQLException {
+        return new NthValueFunction(children, getDelegateFunction(children, context));
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/858dd40b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 41f4c6c..e01e4f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -365,17 +365,14 @@ public class ParseNodeFactory {
 
     public FunctionParseNode function(String name, List<ParseNode> valueNodes,
             List<ParseNode> columnNodes, boolean isAscending) {
-        // Right now we support PERCENT functions on only one column
-        if (valueNodes.size() != 1 || columnNodes.size() != 1) {
-            throw new UnsupportedOperationException(name + " not supported on multiple columns");
-        }
-        List<ParseNode> children = new ArrayList<ParseNode>(3);
-        children.add(columnNodes.get(0));
+
+        List<ParseNode> children = new ArrayList<ParseNode>();
+        children.addAll(columnNodes);
         children.add(new LiteralParseNode(Boolean.valueOf(isAscending)));
-        children.add(valueNodes.get(0));
+        children.addAll(valueNodes);
+
         return function(name, children);
     }
-    
 
     public HintNode hint(String hint) {
         return new HintNode(hint);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/858dd40b/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java b/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java
new file mode 100644
index 0000000..562f189
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/FirstLastNthValueDataContainer.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Container for data transfer between server and client aggregation (FIRST|LAST|NTH)_VALUE functions
+ *
+ */
+public class FirstLastNthValueDataContainer {
+
+    protected boolean isAscending = false;
+    protected int offset;
+    protected TreeMap<byte[], byte[]> data;
+    protected boolean isOrderValuesFixedLength = false;
+    protected boolean isDataValuesFixedLength = false;
+
+    public void setIsAscending(boolean ascending) {
+        isAscending = ascending;
+    }
+
+    public void setData(TreeMap<byte[], byte[]> topValues) {
+        data = topValues;
+    }
+
+    public void setFixedWidthOrderValues(boolean fixedSize) {
+        isOrderValuesFixedLength = fixedSize;
+    }
+
+    public void setFixedWidthDataValues(boolean fixedSize) {
+        isDataValuesFixedLength = fixedSize;
+    }
+
+    public void setOffset(int offset) {
+        this.offset = offset;
+    }
+
+    public void setPayload(byte[] payload) {
+        if (payload[0] == (byte) 1) {
+            isAscending = true;
+        }
+
+        int lengthOfOrderValues = Bytes.toInt(payload, 1);
+        int lengthOfDataValues = Bytes.toInt(payload, 5);
+        int sizeOfMap = Bytes.toInt(payload, 9);
+
+        data = new TreeMap<byte[], byte[]>(new Bytes.ByteArrayComparator());
+
+        int payloadOffset = 13;
+
+        for (; sizeOfMap != 0; sizeOfMap--) {
+            byte[] key;
+            byte[] value;
+
+            if (lengthOfOrderValues != 0) {
+                key = Bytes.copy(payload, payloadOffset, lengthOfOrderValues);
+                payloadOffset += lengthOfOrderValues;
+            } else {
+                int l = Bytes.toInt(payload, payloadOffset);
+                payloadOffset += 4;
+                key = Bytes.copy(payload, payloadOffset, l);
+                payloadOffset += l;
+            }
+
+            if (lengthOfDataValues != 0) {
+                value = Bytes.copy(payload, payloadOffset, lengthOfDataValues);
+                payloadOffset += lengthOfDataValues;
+            } else {
+                int l = Bytes.toInt(payload, payloadOffset);
+                payloadOffset += 4;
+                value = Bytes.copy(payload, payloadOffset, l);
+                payloadOffset += l;
+            }
+
+            data.put(key, value);
+        }
+
+    }
+
+    public byte[] getPayload() throws IOException {
+        /*
+        PAYLOAD STUCTURE
+
+        what                    | size (bytes) | info
+        is ascending            | 1            | 1 = asc, 0 = desc
+        length of order by vals | 4            | 0 if dynamic length, size otherwise
+        length of values        | 4            | 0 if dynamic length, size otherwise
+      [ lenght of first order   | 4            | set if order is var length (optional) ]
+        first order value       | n            | order by val
+      [ lenght of first value   | 4            | set if value is var length (optional) ]
+        first order value       | n            | data val
+        ... and so on, repeat order by values and data values
+
+
+        example with fixed length for data and order by values
+        0           | 0000 0004         | 0000 0004        | 0000 0001       | 0000 000FF  | ...
+        is ascendig | length order vals | length data vals | first order val | first value | ... more values
+
+        example with dynamic length for data (length will be zeros)
+        0           | 0000 0000         | 0000 0000        | 0000 0004          | 0000 000FF        | ...
+        is ascendig | length order vals | length data vals | first order length | first order value | ... more values
+
+        */
+
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+        bos.write(isAscending ? (byte) 1 : (byte) 0);
+
+        Entry<byte[], byte[]> firstEntry = data.firstEntry();
+        if (isOrderValuesFixedLength) {
+            bos.write(Bytes.toBytes(firstEntry.getKey().length));
+        } else {
+            bos.write(Bytes.toBytes(0));
+        }
+
+        if (isDataValuesFixedLength) {
+            bos.write(Bytes.toBytes(firstEntry.getValue().length));
+        } else {
+            bos.write(Bytes.toBytes(0));
+        }
+
+        bos.write(Bytes.toBytes(data.size()));
+
+        for (Map.Entry<byte[], byte[]> entry : data.entrySet()) {
+
+            if (!isOrderValuesFixedLength) {
+                bos.write(Bytes.toBytes(entry.getKey().length));
+            }
+            bos.write(entry.getKey());
+
+            if (!isDataValuesFixedLength) {
+                bos.write(Bytes.toBytes(entry.getValue().length));
+            }
+            bos.write(entry.getValue());
+        }
+
+        return bos.toByteArray();
+    }
+
+    public boolean getIsAscending() {
+        return isAscending;
+    }
+
+    public TreeMap getData() {
+        return data;
+    }
+}


Mime
View raw message