phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramkris...@apache.org
Subject [4/6] Phoenix-933 Local index support to Phoenix (Rajesh)
Date Wed, 16 Jul 2014 04:33:02 GMT
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/TestIndexLoadBalancer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/TestIndexLoadBalancer.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/TestIndexLoadBalancer.java
new file mode 100644
index 0000000..9020e73
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/TestIndexLoadBalancer.java
@@ -0,0 +1,488 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.balancer;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.LoadBalancer;
+import org.apache.hadoop.hbase.master.RegionStates;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.TestSplitTransactionOnCluster.MockedRegionObserver;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.phoenix.hbase.index.IndexTestingUtils;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.master.IndexMasterObserver;
+import org.apache.phoenix.util.ConfigUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestIndexLoadBalancer {
+
+    private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+    private static HBaseAdmin admin = null;
+
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        final int NUM_RS = 4;
+        Configuration conf = UTIL.getConfiguration();
+        conf.setBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, true);
+        conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, IndexMasterObserver.class.getName());
+        conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, IndexLoadBalancer.class,
+            LoadBalancer.class);
+        IndexTestingUtils.setupConfig(conf);
+        // disable version checking, so we can test against whatever version of HBase happens to be
+        // installed (right now, its generally going to be SNAPSHOT versions).
+        conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false);
+        // set replication required parameter
+        ConfigUtil.setReplicationConfigIfAbsent(conf);
+        UTIL.startMiniCluster(NUM_RS);
+        admin = UTIL.getHBaseAdmin();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        if (admin != null) {
+            admin.disableTables(".*");
+            admin.deleteTables(".*");
+            admin.close();
+        }
+        UTIL.shutdownMiniCluster();
+    }
+
+    @Test(timeout = 180000)
+    public void testRoundRobinAssignmentDuringIndexTableCreation() throws Exception {
+        MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+        HMaster master = cluster.getMaster();
+        TableName tableName = TableName.valueOf("testRoundRobinAssignmentDuringIndexTableCreation");
+        TableName indexTableName =
+                TableName.valueOf("testRoundRobinAssignmentDuringIndexTableCreation_index");
+        createUserAndIndexTable(tableName, indexTableName);
+        boolean isRegionColocated =
+                checkForColocation(master, tableName.getNameAsString(), indexTableName
+                        .getNameAsString());
+        assertTrue("User regions and index regions should colocate.", isRegionColocated);
+    }
+
+    @Test(timeout = 180000)
+    public void testColocationAfterSplit() throws Exception {
+        MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+        HMaster master = cluster.getMaster();
+        // Table names to make use of the
+        TableName tableName = TableName.valueOf("testSplitHooksBeforeAndAfterPONR_1");
+        TableName indexTableName = TableName.valueOf("testSplitHooksBeforeAndAfterPONR_2");
+        HTableDescriptor htd = new HTableDescriptor(tableName);
+        htd.addCoprocessor(MockedRegionObserver.class.getName());
+        htd.addFamily(new HColumnDescriptor("cf"));
+        char c = 'A';
+        byte[][] split = new byte[20][];
+        for (int i = 0; i < 20; i++) {
+            byte[] b = { (byte) c };
+            split[i] = b;
+            c++;
+        }
+        admin.createTable(htd, split);
+        HTableDescriptor iHtd = new HTableDescriptor(indexTableName);
+        iHtd.addFamily(new HColumnDescriptor("cf"));
+        iHtd.setValue(IndexLoadBalancer.PARENT_TABLE_KEY, tableName.toBytes());
+        admin.createTable(iHtd, split);
+
+        // test put with the indexed column
+
+        insertData(tableName);
+        insertData(indexTableName);
+
+        admin.split(tableName.getNameAsString(), "c");
+        List<HRegionInfo> regionsOfUserTable =
+                master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
+
+        while (regionsOfUserTable.size() != 22) {
+            Thread.sleep(100);
+            regionsOfUserTable =
+                    master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
+        }
+
+        List<HRegionInfo> regionsOfIndexTable =
+                master.getAssignmentManager().getRegionStates().getRegionsOfTable(indexTableName);
+
+        while (regionsOfIndexTable.size() != 22) {
+            Thread.sleep(100);
+            regionsOfIndexTable =
+                    master.getAssignmentManager().getRegionStates().getRegionsOfTable(
+                        indexTableName);
+        }
+        boolean isRegionColocated =
+                checkForColocation(master, tableName.getNameAsString(), indexTableName
+                        .getNameAsString());
+        assertTrue("User regions and index regions should colocate.", isRegionColocated);
+    }
+
+    @Test(timeout = 180000)
+    public void testColocationAfterRegionsMerge() throws Exception {
+        MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+        HMaster master = cluster.getMaster();
+        RegionStates regionStates = master.getAssignmentManager().getRegionStates();
+        // Table names to make use of the
+        TableName tableName = TableName.valueOf("testColocationAfterRegionsMerge");
+        TableName indexTableName = TableName.valueOf("testColocationAfterRegionsMerge_index");
+        createUserAndIndexTable(tableName, indexTableName);
+        ServerName server = cluster.getRegionServer(0).getServerName();
+        List<HRegionInfo> regionsOfUserTable = regionStates.getRegionsOfTable(tableName);
+        Pair<HRegionInfo, HRegionInfo> regionsToMerge = new Pair<HRegionInfo, HRegionInfo>();
+        byte[] startKey1 = { (byte) 'C' };
+        byte[] startKey2 = { (byte) 'D' };
+        for (HRegionInfo region : regionsOfUserTable) {
+            if (Bytes.compareTo(startKey1, region.getStartKey()) == 0) {
+                regionsToMerge.setFirst(region);
+            } else if (Bytes.compareTo(startKey2, region.getStartKey()) == 0) {
+                regionsToMerge.setSecond(region);
+            }
+        }
+        admin.move(regionsToMerge.getFirst().getEncodedNameAsBytes(), Bytes.toBytes(server
+                .toString()));
+        admin.move(regionsToMerge.getSecond().getEncodedNameAsBytes(), Bytes.toBytes(server
+                .toString()));
+
+        List<HRegionInfo> regionsOfIndexTable = regionStates.getRegionsOfTable(indexTableName);
+        Pair<HRegionInfo, HRegionInfo> indexRegionsToMerge = new Pair<HRegionInfo, HRegionInfo>();
+        for (HRegionInfo region : regionsOfIndexTable) {
+            if (Bytes.compareTo(startKey1, region.getStartKey()) == 0) {
+                indexRegionsToMerge.setFirst(region);
+            } else if (Bytes.compareTo(startKey2, region.getStartKey()) == 0) {
+                indexRegionsToMerge.setSecond(region);
+            }
+        }
+        admin.move(indexRegionsToMerge.getFirst().getEncodedNameAsBytes(), Bytes.toBytes(server
+                .toString()));
+        admin.move(indexRegionsToMerge.getSecond().getEncodedNameAsBytes(), Bytes.toBytes(server
+                .toString()));
+        while (!regionStates.getRegionServerOfRegion(regionsToMerge.getFirst()).equals(server)
+                || !regionStates.getRegionServerOfRegion(regionsToMerge.getSecond()).equals(server)
+                || !regionStates.getRegionServerOfRegion(indexRegionsToMerge.getFirst()).equals(
+                    server)
+                || !regionStates.getRegionServerOfRegion(indexRegionsToMerge.getSecond()).equals(
+                    server)) {
+            Threads.sleep(1000);
+        }
+        admin.mergeRegions(regionsToMerge.getFirst().getEncodedNameAsBytes(), regionsToMerge
+                .getSecond().getEncodedNameAsBytes(), true);
+        admin.mergeRegions(indexRegionsToMerge.getFirst().getEncodedNameAsBytes(),
+            indexRegionsToMerge.getSecond().getEncodedNameAsBytes(), true);
+
+        while (regionsOfUserTable.size() != 20 || regionsOfIndexTable.size() != 20) {
+            Thread.sleep(100);
+            regionsOfUserTable = regionStates.getRegionsOfTable(tableName);
+            regionsOfIndexTable = regionStates.getRegionsOfTable(indexTableName);
+        }
+        boolean isRegionColocated =
+                checkForColocation(master, tableName.getNameAsString(), indexTableName
+                        .getNameAsString());
+        assertTrue("User regions and index regions should colocate.", isRegionColocated);
+    }
+
+    private void insertData(TableName tableName) throws IOException, InterruptedException {
+        HTable table = new HTable(admin.getConfiguration(), tableName);
+        Put p = new Put("a".getBytes());
+        p.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("Val"));
+        p.add("cf".getBytes(), "q2".getBytes(), Bytes.toBytes("ValForCF2"));
+        table.put(p);
+
+        Put p1 = new Put("b".getBytes());
+        p1.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("Val"));
+        p1.add("cf".getBytes(), "q2".getBytes(), Bytes.toBytes("ValForCF2"));
+        table.put(p1);
+
+        Put p2 = new Put("c".getBytes());
+        p2.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("Val"));
+        p2.add("cf".getBytes(), "q2".getBytes(), Bytes.toBytes("ValForCF2"));
+        table.put(p2);
+
+        Put p3 = new Put("c1".getBytes());
+        p3.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("Val"));
+        p3.add("cf".getBytes(), "q2".getBytes(), Bytes.toBytes("ValForCF2"));
+        table.put(p3);
+
+        Put p4 = new Put("d".getBytes());
+        p4.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("Val"));
+        p4.add("cf".getBytes(), "q2".getBytes(), Bytes.toBytes("ValForCF2"));
+        table.put(p4);
+        admin.flush(tableName.getNameAsString());
+    }
+
+    @Test(timeout = 180000)
+    public void testRandomAssignmentDuringIndexTableEnable() throws Exception {
+        MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+        HMaster master = cluster.getMaster();
+        master.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", false);
+        TableName tableName = TableName.valueOf("testRandomAssignmentDuringIndexTableEnable");
+        TableName indexTableName =
+                TableName.valueOf("testRandomAssignmentDuringIndexTableEnable_index");
+        createUserAndIndexTable(tableName, indexTableName);
+        admin.disableTable(tableName);
+        admin.disableTable(indexTableName);
+        admin.enableTable(tableName);
+        admin.enableTable(indexTableName);
+        boolean isRegionColocated =
+                checkForColocation(master, tableName.getNameAsString(), indexTableName
+                        .getNameAsString());
+        assertTrue("User regions and index regions should colocate.", isRegionColocated);
+
+    }
+
+    @Test(timeout = 180000)
+    public void testBalanceCluster() throws Exception {
+        MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+        HMaster master = cluster.getMaster();
+        master.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", false);
+        master.getConfiguration().setBoolean("hbase.master.startup.retainassign", false);
+        master.getConfiguration().setBoolean("hbase.master.loadbalance.bytable", false);
+
+        TableName tableName = TableName.valueOf("testBalanceCluster");
+        TableName indexTableName = TableName.valueOf("testBalanceCluster_index");
+        createUserAndIndexTable(tableName, indexTableName);
+        HTableDescriptor htd1 = new HTableDescriptor(TableName.valueOf("testBalanceCluster1"));
+        htd1.addFamily(new HColumnDescriptor("fam1"));
+        char c = 'A';
+        byte[][] split1 = new byte[12][];
+        for (int i = 0; i < 12; i++) {
+            byte[] b = { (byte) c };
+            split1[i] = b;
+            c++;
+        }
+        admin.setBalancerRunning(false, false);
+        admin.createTable(htd1, split1);
+        admin.disableTable(tableName);
+        admin.enableTable(tableName);
+        admin.setBalancerRunning(true, false);
+        admin.balancer();
+        boolean isRegionsColocated =
+                checkForColocation(master, tableName.getNameAsString(), indexTableName
+                        .getNameAsString());
+        assertTrue("User regions and index regions should colocate.", isRegionsColocated);
+    }
+
+    @Test(timeout = 180000)
+    public void testBalanceByTable() throws Exception {
+        ZooKeeperWatcher zkw = UTIL.getZooKeeperWatcher(UTIL);
+        MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+        HMaster master = cluster.getMaster();
+        master.getConfiguration().setBoolean("hbase.master.loadbalance.bytable", true);
+        TableName tableName = TableName.valueOf("testBalanceByTable");
+        TableName indexTableName = TableName.valueOf("testBalanceByTable_index");
+        createUserAndIndexTable(tableName, indexTableName);
+        HTableDescriptor htd1 = new HTableDescriptor(TableName.valueOf("testBalanceByTable1"));
+        htd1.addFamily(new HColumnDescriptor("fam1"));
+        char c = 'A';
+        byte[][] split1 = new byte[12][];
+        for (int i = 0; i < 12; i++) {
+            byte[] b = { (byte) c };
+            split1[i] = b;
+            c++;
+        }
+        admin.disableTable(tableName);
+        admin.enableTable(tableName);
+        admin.setBalancerRunning(true, false);
+        boolean isRegionColocated =
+                checkForColocation(master, tableName.getNameAsString(), indexTableName
+                        .getNameAsString());
+        assertTrue("User regions and index regions should colocate.", isRegionColocated);
+        admin.balancer();
+        Thread.sleep(10000);
+        ZKAssign.blockUntilNoRIT(zkw);
+        while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+            Threads.sleep(1000);
+        }
+        isRegionColocated =
+                checkForColocation(master, tableName.getNameAsString(), indexTableName
+                        .getNameAsString());
+        assertTrue("User regions and index regions should colocate.", isRegionColocated);
+    }
+
+    @Test(timeout = 180000)
+    public void testRoundRobinAssignmentAfterRegionServerDown() throws Exception {
+        ZooKeeperWatcher zkw = UTIL.getZooKeeperWatcher(UTIL);
+        MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+        HMaster master = cluster.getMaster();
+        TableName tableName = TableName.valueOf("testRoundRobinAssignmentAfterRegionServerDown");
+        TableName indexTableName =
+                TableName.valueOf("testRoundRobinAssignmentAfterRegionServerDown_index");
+        createUserAndIndexTable(tableName, indexTableName);
+        HRegionServer regionServer = cluster.getRegionServer(1);
+        regionServer.abort("Aborting to test random assignment after region server down");
+        while (master.getServerManager().areDeadServersInProgress()) {
+            Thread.sleep(1000);
+        }
+        ZKAssign.blockUntilNoRIT(zkw);
+        while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+            Threads.sleep(1000);
+        }
+        boolean isRegionColocated =
+                checkForColocation(master, tableName.getNameAsString(), indexTableName
+                        .getNameAsString());
+        assertTrue("User regions and index regions should colocate.", isRegionColocated);
+
+    }
+
+    @Test(timeout = 180000)
+    public void testRetainAssignmentDuringMasterStartUp() throws Exception {
+        ZooKeeperWatcher zkw = UTIL.getZooKeeperWatcher(UTIL);
+        MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+        HMaster master = cluster.getMaster();
+        master.getConfiguration().setBoolean("hbase.master.startup.retainassign", true);
+        TableName tableName = TableName.valueOf("testRetainAssignmentDuringMasterStartUp");
+        TableName indexTableName =
+                TableName.valueOf("testRetainAssignmentDuringMasterStartUp_index");
+        createUserAndIndexTable(tableName, indexTableName);
+        UTIL.shutdownMiniHBaseCluster();
+        UTIL.startMiniHBaseCluster(1, 4);
+        cluster = UTIL.getHBaseCluster();
+        master = cluster.getMaster();
+        if (admin != null) {
+            admin.close();
+            admin = new HBaseAdmin(master.getConfiguration());
+        }
+        ZKAssign.blockUntilNoRIT(zkw);
+        while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+            Threads.sleep(1000);
+        }
+        boolean isRegionColocated =
+                checkForColocation(master, tableName.getNameAsString(), indexTableName
+                        .getNameAsString());
+        assertTrue("User regions and index regions should colocate.", isRegionColocated);
+
+    }
+
+    @Test(timeout = 300000)
+    public void testRoundRobinAssignmentDuringMasterStartUp() throws Exception {
+        MiniHBaseCluster cluster = UTIL.getHBaseCluster();
+        HMaster master = cluster.getMaster();
+        UTIL.getConfiguration().setBoolean("hbase.master.startup.retainassign", false);
+
+        TableName tableName = TableName.valueOf("testRoundRobinAssignmentDuringMasterStartUp");
+        TableName indexTableName =
+                TableName.valueOf("testRoundRobinAssignmentDuringMasterStartUp_index");
+        createUserAndIndexTable(tableName, indexTableName);
+        UTIL.shutdownMiniHBaseCluster();
+        cluster.waitUntilShutDown();
+        UTIL.startMiniHBaseCluster(1, 4);
+        cluster = UTIL.getHBaseCluster();
+        if (admin != null) {
+            admin.close();
+            admin = new HBaseAdmin(cluster.getMaster().getConfiguration());
+        }
+        master = cluster.getMaster();
+        while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
+            Threads.sleep(1000);
+        }
+        boolean isRegionColocated =
+                checkForColocation(master, tableName.getNameAsString(), indexTableName
+                        .getNameAsString());
+        assertTrue("User regions and index regions should colocate.", isRegionColocated);
+    }
+
+    private void createUserAndIndexTable(TableName tableName, TableName indexTableName)
+            throws IOException {
+        HTableDescriptor htd = new HTableDescriptor(tableName);
+        htd.addFamily(new HColumnDescriptor("cf"));
+        char c = 'A';
+        byte[][] split = new byte[20][];
+        for (int i = 0; i < 20; i++) {
+            byte[] b = { (byte) c };
+            split[i] = b;
+            c++;
+        }
+        admin.createTable(htd, split);
+        HTableDescriptor iHtd = new HTableDescriptor(indexTableName);
+        iHtd.addFamily(new HColumnDescriptor("cf"));
+        iHtd.setValue(IndexLoadBalancer.PARENT_TABLE_KEY, tableName.toBytes());
+        admin.createTable(iHtd, split);
+    }
+
+    private List<Pair<byte[], ServerName>> getStartKeysAndLocations(HMaster master, String tableName)
+            throws IOException, InterruptedException {
+
+        List<Pair<HRegionInfo, ServerName>> tableRegionsAndLocations =
+                MetaReader.getTableRegionsAndLocations(master.getCatalogTracker(), TableName
+                        .valueOf(tableName));
+        List<Pair<byte[], ServerName>> startKeyAndLocationPairs =
+                new ArrayList<Pair<byte[], ServerName>>(tableRegionsAndLocations.size());
+        Pair<byte[], ServerName> startKeyAndLocation = null;
+        for (Pair<HRegionInfo, ServerName> regionAndLocation : tableRegionsAndLocations) {
+            startKeyAndLocation =
+                    new Pair<byte[], ServerName>(regionAndLocation.getFirst().getStartKey(),
+                            regionAndLocation.getSecond());
+            startKeyAndLocationPairs.add(startKeyAndLocation);
+        }
+        return startKeyAndLocationPairs;
+
+    }
+
+    public boolean checkForColocation(HMaster master, String tableName, String indexTableName)
+            throws IOException, InterruptedException {
+        List<Pair<byte[], ServerName>> uTableStartKeysAndLocations =
+                getStartKeysAndLocations(master, tableName);
+        List<Pair<byte[], ServerName>> iTableStartKeysAndLocations =
+                getStartKeysAndLocations(master, indexTableName);
+
+        boolean regionsColocated = true;
+        if (uTableStartKeysAndLocations.size() != iTableStartKeysAndLocations.size()) {
+            regionsColocated = false;
+        } else {
+            for (int i = 0; i < uTableStartKeysAndLocations.size(); i++) {
+                Pair<byte[], ServerName> uStartKeyAndLocation = uTableStartKeysAndLocations.get(i);
+                Pair<byte[], ServerName> iStartKeyAndLocation = iTableStartKeysAndLocations.get(i);
+
+                if (Bytes.compareTo(uStartKeyAndLocation.getFirst(), iStartKeyAndLocation
+                        .getFirst()) == 0) {
+                    if (uStartKeyAndLocation.getSecond().equals(iStartKeyAndLocation.getSecond())) {
+                        continue;
+                    }
+                }
+                regionsColocated = false;
+            }
+        }
+        return regionsColocated;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index dbcca4f..5f52da4 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -98,7 +98,7 @@ tokens
     VALUE='value';
     FOR='for';
     CACHE='cache';
-    DERIVE='derive';
+    LOCAL='local';
     ANY='any';
     SOME='some';
 }
@@ -144,6 +144,7 @@ import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.util.SchemaUtil;
 }
 
@@ -381,12 +382,12 @@ create_view_node returns [CreateTableStatement ret]
 
 // Parse a create index statement.
 create_index_node returns [CreateIndexStatement ret]
-    :   CREATE INDEX (IF NOT ex=EXISTS)? i=index_name ON t=from_table_name
+    :   CREATE l=LOCAL? INDEX (IF NOT ex=EXISTS)? i=index_name ON t=from_table_name
         (LPAREN pk=index_pk_constraint RPAREN)
         (INCLUDE (LPAREN icrefs=column_names RPAREN))?
         (p=fam_properties)?
         (SPLIT ON v=value_expression_list)?
-        {ret = factory.createIndex(i, factory.namedTable(null,t), pk, icrefs, v, p, ex!=null, getBindCount()); }
+        {ret = factory.createIndex(i, factory.namedTable(null,t), pk, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, getBindCount()); }
     ;
 
 // Parse a create sequence statement.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
index bbd7154..2a687c6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
@@ -23,15 +23,19 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.CreateIndexStatement;
 import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.PropertyName;
 import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PTable.IndexType;
 
 public class CreateIndexCompiler {
     private final PhoenixStatement statement;
@@ -47,6 +51,21 @@ public class CreateIndexCompiler {
         final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
         ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
         List<ParseNode> splitNodes = create.getSplitNodes();
+        if (create.getIndexType() == IndexType.LOCAL) {
+            if (!splitNodes.isEmpty()) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPLIT_LOCAL_INDEX)
+                .build().buildException();
+            } 
+            List<Pair<String, Object>> list = create.getProps() != null ? create.getProps().get("") : null;
+            if (list != null) {
+                for (Pair<String, Object> pair : list) {
+                    if (pair.getFirst().equals(PhoenixDatabaseMetaData.SALT_BUCKETS)) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SALT_LOCAL_INDEX)
+                        .build().buildException();
+                    }
+                }
+            }
+        }
         final byte[][] splits = new byte[splitNodes.size()][];
         for (int i = 0; i < splits.length; i++) {
             ParseNode node = splitNodes.get(i);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 59a7ce7..81438ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -28,7 +28,6 @@ import java.util.Map;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
index 5650ed5..e7883a0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
@@ -94,14 +94,17 @@ import org.apache.phoenix.parse.SequenceValueParseNode;
 import org.apache.phoenix.parse.StringConcatParseNode;
 import org.apache.phoenix.parse.SubtractParseNode;
 import org.apache.phoenix.parse.UnsupportedAllParseNodeVisitor;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.DelegateDatum;
+import org.apache.phoenix.schema.LocalIndexDataColumnRef;
 import org.apache.phoenix.schema.PArrayDataType;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeyValueAccessor;
 import org.apache.phoenix.schema.SortOrder;
@@ -322,7 +325,23 @@ public class ExpressionCompiler extends UnsupportedAllParseNodeVisitor<Expressio
      * @throws SQLException if the column expression node does not refer to a known/unambiguous column
      */
     protected ColumnRef resolveColumn(ColumnParseNode node) throws SQLException {
-        ColumnRef ref = context.getResolver().resolveColumn(node.getSchemaName(), node.getTableName(), node.getName());
+        ColumnRef ref = null;
+        try {
+            ref = context.getResolver().resolveColumn(node.getSchemaName(), node.getTableName(), node.getName());
+        } catch (ColumnNotFoundException e) {
+            // Rather than not use a local index when a column not contained by it is referenced, we
+            // join back to the data table in our coprocessor since this is a relatively cheap
+            // operation given that we know the join is local.
+            if (context.getCurrentTable().getTable().getIndexType() == IndexType.LOCAL) {
+                try {
+                    return new LocalIndexDataColumnRef(context, node.getName());
+                } catch (ColumnFamilyNotFoundException c) {
+                    throw e;
+                }
+            } else {
+                throw e;
+            }
+        }
         PTable table = ref.getTable();
         int pkPosition = ref.getPKSlotPosition();
         // Disallow explicit reference to salting column, tenant ID column, and index ID column

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index b9e0949..5d3e64e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -419,7 +419,7 @@ public class FromCompiler {
             PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, 
                     PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, 
                     null, null, columns, null, Collections.<PTable>emptyList(), false, 
-                    Collections.<PName>emptyList(), null, null, false, false, null, null);
+                    Collections.<PName>emptyList(), null, null, false, false, null, null, null);
             
             String alias = subselectNode.getAlias();
             TableRef tableRef = new TableRef(alias, t, MetaDataProtocol.MIN_TABLE_TIMESTAMP, false);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java
index 68a1218..c645799 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java
@@ -95,7 +95,6 @@ public class IndexStatementRewriter extends ParseNodeRewriter {
             return node;
 
         String indexColName = IndexUtil.getIndexColumnName(dataCol);
-        // Same alias as before, but use the index column name instead of the data column name
         ParseNode indexColNode = new ColumnParseNode(tName, node.isCaseSensitive() ? '"' + indexColName + '"' : indexColName, node.getAlias());
         PDataType indexColType = IndexUtil.getIndexColumnDataType(dataCol);
         PDataType dataColType = dataColRef.getColumn().getDataType();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index c433fc5..6a7f24c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -83,6 +83,7 @@ import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
@@ -721,7 +722,7 @@ public class JoinCompiler {
             PTable t = PTableImpl.makePTable(table.getTenantId(), PNameFactory.newName(PROJECTED_TABLE_SCHEMA), table.getName(), PTableType.JOIN,
                         table.getIndexState(), table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(),
                         retainPKColumns ? table.getBucketNum() : null, projectedColumns, table.getParentTableName(),
-                        table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId());
+                        table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType());
             return new ProjectedPTableWrapper(t, columnNameMap, sourceExpressions);
         }
         
@@ -771,7 +772,7 @@ public class JoinCompiler {
             PTable t = PTableImpl.makePTable(table.getTenantId(), PNameFactory.newName(PROJECTED_TABLE_SCHEMA), table.getName(), PTableType.JOIN,
                         table.getIndexState(), table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(),
                         null, projectedColumns, table.getParentTableName(),
-                        table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId());
+                        table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId(), table.getIndexType());
             return new ProjectedPTableWrapper(t, columnNameMap, sourceExpressions);
         }
     }
@@ -1135,7 +1136,7 @@ public class JoinCompiler {
             SelectStatement stmt = getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect());
             QueryPlan plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, stmt);
             if (!plan.getTableRef().equals(tableRef)) {
-                replacement.put(tableRef, plan.getTableRef());            
+                replacement.put(tableRef, plan.getTableRef());
             }            
         }
         
@@ -1272,7 +1273,7 @@ public class JoinCompiler {
             }
             PTable t = PTableImpl.makePTable(left.getTenantId(), left.getSchemaName(),
                     PNameFactory.newName(SchemaUtil.getTableName(left.getName().getString(), right.getName().getString())), left.getType(), left.getIndexState(), left.getTimeStamp(), left.getSequenceNumber(), left.getPKName(), left.getBucketNum(), merged,
-                    left.getParentTableName(), left.getIndexes(), left.isImmutableRows(), Collections.<PName>emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL, left.isMultiTenant(), left.getViewType(), left.getViewIndexId());
+                    left.getParentTableName(), left.getIndexes(), left.isImmutableRows(), Collections.<PName>emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL, left.isMultiTenant(), left.getViewType(), left.getViewIndexId(), left.getIndexType());
 
             ListMultimap<String, String> mergedMap = ArrayListMultimap.<String, String>create();
             mergedMap.putAll(this.getColumnNameMap());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 06d5f89..b27447c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -20,7 +20,10 @@ package org.apache.phoenix.compile;
 import java.sql.SQLException;
 import java.text.Format;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -32,12 +35,15 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.NumberUtil;
 import org.apache.phoenix.util.ScanUtil;
 
+import com.google.common.collect.Maps;
+
 
 /**
  *
@@ -59,6 +65,7 @@ public class StatementContext {
     private final String numberFormat;
     private final ImmutableBytesWritable tempPtr;
     private final PhoenixStatement statement;
+    private final Map<PColumn, Integer> dataColumns;
     
     private long currentTime = QueryConstants.UNSET_TIMESTAMP;
     private ScanRanges scanRanges = ScanRanges.EVERYTHING;
@@ -89,6 +96,7 @@ public class StatementContext {
         this.tempPtr = new ImmutableBytesWritable();
         this.currentTable = resolver != null && !resolver.getTables().isEmpty() ? resolver.getTables().get(0) : null;
         this.whereConditionColumns = new ArrayList<Pair<byte[],byte[]>>();
+        this.dataColumns = this.currentTable == null ? Collections.<PColumn, Integer>emptyMap() : Maps.<PColumn, Integer>newLinkedHashMap();
     }
 
     /**
@@ -112,6 +120,36 @@ public class StatementContext {
         this.tempPtr = new ImmutableBytesWritable();
         this.currentTable = stmtContext.currentTable;
         this.whereConditionColumns = stmtContext.whereConditionColumns;
+        this.dataColumns = stmtContext.getDataColumnsMap();
+    }
+
+    /**
+     * build map from dataColumn to what will be its position in single KeyValue value bytes
+     * returned from the coprocessor that joins from the index row back to the data row.
+     * @param column
+     * @return
+     */
+    public int getDataColumnPosition(PColumn column) {
+        Integer pos = dataColumns.get(column);
+        if (pos == null) {
+            pos = dataColumns.size();
+            dataColumns.put(column, pos);
+        }
+        return pos;
+    }
+
+    /**
+     * @return return set of data columns.
+     */
+    public Set<PColumn> getDataColumns() {
+        return dataColumns.keySet();
+    }
+
+    /**
+     * @return map of data columns and their positions. 
+     */
+    public Map<PColumn, Integer> getDataColumnsMap() {
+        return dataColumns;
     }
 
     public String getDateFormat() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java
index ebf117d..44f9527 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java
@@ -36,6 +36,7 @@ import org.apache.phoenix.parse.SubtractParseNode;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.util.SchemaUtil;
 
 import com.google.common.collect.Lists;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 98e4517..b9a53f8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -44,8 +44,10 @@ import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.LocalIndexDataColumnRef;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.TypeMismatchException;
@@ -135,6 +137,15 @@ public class WhereCompiler {
         protected ColumnRef resolveColumn(ColumnParseNode node) throws SQLException {
             ColumnRef ref = super.resolveColumn(node);
             PTable table = ref.getTable();
+            // if current table in the context is local index and table in column reference is global means
+            // the column is not present in the local index. If where condition contains the column 
+            // not present in the index then we need to go through main table for each row in index and get the
+            // missing column which is like full scan of index table and data table. Which is
+            // inefficient. Then we can skip this plan.
+            if (context.getCurrentTable().getTable().getIndexType() == IndexType.LOCAL
+                    && (table.getIndexType() == null || table.getIndexType() == IndexType.GLOBAL)) {
+                throw new ColumnNotFoundException(ref.getColumn().getName().getString());
+            }
             // Track if we need to compare KeyValue during filter evaluation
             // using column family. If the column qualifier is enough, we
             // just use that.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index b94bf0a..3ca0ce3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -44,6 +44,11 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String EMPTY_CF = "_EmptyCF";
     public static final String SPECIFIC_ARRAY_INDEX = "_SpecificArrayIndex";
     public static final String GROUP_BY_LIMIT = "_GroupByLimit";
+    public static final String LOCAL_INDEX = "_LocalIndex";
+    public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild";
+    public static final String LOCAL_INDEX_JOIN_SCHEMA = "_LocalIndexJoinSchema";
+    public static final String DATA_TABLE_COLUMNS_TO_JOIN = "_DataTableColumnsToJoin";
+    public static final String VIEW_CONSTANTS = "_ViewConstants";
 
     /**
      * Used by logger to identify coprocessor

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index a74745d..2322eb3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -56,7 +56,9 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
@@ -64,6 +66,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SizedUtil;
@@ -71,6 +74,7 @@ import org.apache.phoenix.util.TupleUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
 
@@ -105,7 +109,18 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
             }
             keyOrdered = true;
         }
-        List<Expression> expressions = deserializeGroupByExpressions(expressionBytes);
+        int offset = 0;
+        if (ScanUtil.isLocalIndex(scan)) {
+            /*
+             * For local indexes, we need to set an offset on row key expressions to skip
+             * the region start key.
+             */
+            HRegion region = c.getEnvironment().getRegion();
+            offset = region.getStartKey().length != 0 ? region.getStartKey().length:region.getEndKey().length;
+            ScanUtil.setRowKeyOffset(scan, offset);
+        }
+        
+        List<Expression> expressions = deserializeGroupByExpressions(expressionBytes, 0);
         ServerAggregators aggregators =
                 ServerAggregators.deserialize(scan
                         .getAttribute(BaseScannerRegionObserver.AGGREGATORS), c
@@ -125,12 +140,30 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                     new HashJoinRegionScanner(s, p, j, ScanUtil.getTenantId(scan),
                             c.getEnvironment());
         }
+        byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+        List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
+        boolean localIndexScan = ScanUtil.isLocalIndex(scan);
+        TupleProjector tupleProjector = null;
+        HRegion dataRegion = null;
+        byte[][] viewConstants = null;
+        ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
+        if (ScanUtil.isLocalIndex(scan)) {
+            if (dataColumns != null) {
+                tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
+                dataRegion = IndexUtil.getDataRegion(c.getEnvironment());
+                viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
+            }
+        } 
 
         if (keyOrdered) { // Optimize by taking advantage that the rows are
                           // already in the required group by key order
-            return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit);
+            return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit, offset,
+                localIndexScan, dataColumns, tupleProjector, indexMaintainers, dataRegion,
+                viewConstants);
         } else { // Otherwse, collect them all up in an in memory map
-            return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit);
+            return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit, offset,
+                localIndexScan, dataColumns, tupleProjector, indexMaintainers, dataRegion,
+                viewConstants);
         }
     }
 
@@ -165,7 +198,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
 
     }
 
-    private List<Expression> deserializeGroupByExpressions(byte[] expressionBytes)
+    private List<Expression> deserializeGroupByExpressions(byte[] expressionBytes, int offset)
             throws IOException {
         List<Expression> expressions = new ArrayList<Expression>(3);
         ByteArrayInputStream stream = new ByteArrayInputStream(expressionBytes);
@@ -177,6 +210,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                     Expression expression =
                             ExpressionType.values()[expressionOrdinal].newInstance();
                     expression.readFields(input);
+                    if (offset != 0) {
+                        IndexUtil.setRowKeyExpressionOffset(expression, offset);
+                    }
                     expressions.add(expression);
                 } catch (EOFException e) {
                     break;
@@ -341,7 +377,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
      */
     private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
             final RegionScanner s, final List<Expression> expressions,
-            final ServerAggregators aggregators, long limit) throws IOException {
+            final ServerAggregators aggregators, long limit, int offset, boolean localIndexScan,
+            ColumnReference[] dataColumns, TupleProjector tupleProjector,
+            List<IndexMaintainer> indexMaintainers, HRegion dataRegion, byte[][] viewConstants)
+            throws IOException {
         if (logger.isDebugEnabled()) {
             logger.debug("Grouped aggregation over unordered rows with scan " + scan
                     + ", group by " + expressions + ", aggregators " + aggregators);
@@ -363,7 +402,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                 GroupByCacheFactory.INSTANCE.newCache(
                         env, ScanUtil.getTenantId(scan), 
                         aggregators, estDistVals);
-
+        ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
         boolean success = false;
         try {
             boolean hasMore;
@@ -385,6 +424,11 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                     // ones returned
                     hasMore = s.nextRaw(results);
                     if (!results.isEmpty()) {
+                        if (localIndexScan) {
+                            IndexUtil.wrapResultUsingOffset(results, offset, dataColumns, tupleProjector,
+                                dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0),
+                                viewConstants, tempPtr);
+                        }
                         result.setKeyValues(results);
                         ImmutableBytesWritable key =
                                 TupleUtil.getConcatenatedValue(result, expressions);
@@ -416,15 +460,20 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
      * Used for an aggregate query in which the key order match the group by key order. In this
      * case, we can do the aggregation as we scan, by detecting when the group by key changes.
      * @param limit TODO
+     * @throws IOException 
      */
     private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c,
             Scan scan, final RegionScanner s, final List<Expression> expressions,
-            final ServerAggregators aggregators, final long limit) {
+            final ServerAggregators aggregators, final long limit, final int offset,
+            final boolean localIndexScan, final ColumnReference[] dataColumns,
+            final TupleProjector tupleProjector, final List<IndexMaintainer> indexMaintainers,
+            final HRegion dataRegion, final byte[][] viewConstants) throws IOException {
 
         if (logger.isDebugEnabled()) {
             logger.debug("Grouped aggregation over ordered rows with scan " + scan + ", group by "
                     + expressions + ", aggregators " + aggregators);
         }
+        final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
         return new BaseRegionScanner() {
             private long rowCount = 0;
             private ImmutableBytesWritable currentKey = null;
@@ -462,6 +511,11 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                         // ones returned
                         hasMore = s.nextRaw(kvs);
                         if (!kvs.isEmpty()) {
+                            if (localIndexScan) {
+                                IndexUtil.wrapResultUsingOffset(kvs, offset, dataColumns, tupleProjector,
+                                    dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0),
+                                    viewConstants, tempPtr);
+                            }
                             result.setKeyValues(kvs);
                             key = TupleUtil.getConcatenatedValue(result, expressions);
                             aggBoundary = currentKey != null && currentKey.compareTo(key) != 0;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index dcda21e..3f4892b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -31,6 +31,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FAMILY_NAME_INDEX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT_BYTES;
@@ -115,6 +116,7 @@ import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.LinkType;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.PTableImpl;
@@ -169,6 +171,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final KeyValue MULTI_TENANT_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, MULTI_TENANT_BYTES);
     private static final KeyValue VIEW_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_TYPE_BYTES);
     private static final KeyValue VIEW_INDEX_ID_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, VIEW_INDEX_ID_BYTES);
+    private static final KeyValue INDEX_TYPE_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, INDEX_TYPE_BYTES);
     private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
             TABLE_TYPE_KV,
             TABLE_SEQ_NUM_KV,
@@ -183,7 +186,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             DISABLE_WAL_KV,
             MULTI_TENANT_KV,
             VIEW_TYPE_KV,
-            VIEW_INDEX_ID_KV
+            VIEW_INDEX_ID_KV,
+            INDEX_TYPE_KV
             );
     static {
         Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -202,6 +206,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final int MULTI_TENANT_INDEX = TABLE_KV_COLUMNS.indexOf(MULTI_TENANT_KV);
     private static final int VIEW_TYPE_INDEX = TABLE_KV_COLUMNS.indexOf(VIEW_TYPE_KV);
     private static final int VIEW_INDEX_ID_INDEX = TABLE_KV_COLUMNS.indexOf(VIEW_INDEX_ID_KV);
+    private static final int INDEX_TYPE_INDEX = TABLE_KV_COLUMNS.indexOf(INDEX_TYPE_KV);
     
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
@@ -625,6 +630,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         ViewType viewType = viewTypeKv == null ? null : ViewType.fromSerializedValue(viewTypeKv.getValueArray()[viewTypeKv.getValueOffset()]);
         Cell viewIndexIdKv = tableKeyValues[VIEW_INDEX_ID_INDEX];
         Short viewIndexId = viewIndexIdKv == null ? null : (Short)MetaDataUtil.getViewIndexIdDataType().getCodec().decodeShort(viewIndexIdKv.getValueArray(), viewIndexIdKv.getValueOffset(), SortOrder.getDefault());
+        Cell indexTypeKv = tableKeyValues[INDEX_TYPE_INDEX];
+        IndexType indexType = indexTypeKv == null ? null : IndexType.fromSerializedValue(indexTypeKv.getValueArray()[indexTypeKv.getValueOffset()]);
         
         List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount);
         List<PTable> indexes = new ArrayList<PTable>();
@@ -657,7 +664,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, 
             tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? dataTableName : null, 
             indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL, 
-            multiTenant, viewType, viewIndexId);
+            multiTenant, viewType, viewIndexId, indexType);
     }
 
     private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 7131993..ac93804 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -59,7 +59,9 @@ public abstract class MetaDataProtocol extends MetaDataService {
             VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER);
     
     public static final long MIN_TABLE_TIMESTAMP = 0;
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 1;
+    // Each time a column is added to the SYSTEM.CATALOG, this should be increased.
+    // Adding INDEX_TYPE column for local indexing
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_TABLE_TIMESTAMP + 2;
     public static final int DEFAULT_MAX_META_DATA_VERSIONS = 1000;
 
     // TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 51d9033..6fe4598 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -25,8 +25,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
@@ -44,6 +44,8 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.expression.function.ArrayIndexFunction;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.iterate.OrderedResultIterator;
 import org.apache.phoenix.iterate.RegionScannerResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
@@ -57,6 +59,7 @@ import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
@@ -176,6 +179,16 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
         if (isScanQuery == null || Bytes.compareTo(PDataType.FALSE_BYTES, isScanQuery) == 0) {
             return s;
         }
+        int offset = 0;
+        if (ScanUtil.isLocalIndex(scan)) {
+            /*
+             * For local indexes, we need to set an offset on row key expressions to skip
+             * the region start key.
+             */
+            HRegion region = c.getEnvironment().getRegion();
+            offset = region.getStartKey().length != 0 ? region.getStartKey().length:region.getEndKey().length;
+            ScanUtil.setRowKeyOffset(scan, offset);
+        }
         
         final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
@@ -186,18 +199,33 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
             innerScanner = new HashJoinRegionScanner(s, p, j, tenantId, c.getEnvironment());
         }
         
-        final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner);
         List<KeyValueColumnExpression> arrayKVRefs = new ArrayList<KeyValueColumnExpression>();
         Expression[] arrayFuncRefs = deserializeArrayPostionalExpressionInfoFromScan(
                 scan, innerScanner, arrayKVRefs);
-        innerScanner = getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs);
+        TupleProjector tupleProjector = null;
+        HRegion dataRegion = null;
+        IndexMaintainer indexMaintainer = null;
+        byte[][] viewConstants = null;
+        ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
+        if (dataColumns != null) {
+            tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
+            dataRegion = IndexUtil.getDataRegion(c.getEnvironment());
+            byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+            List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
+            indexMaintainer = indexMaintainers.get(0);
+            viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
+        }
+        innerScanner =
+                getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan,
+                    dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants);
+        final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner);  
         if (iterator == null) {
             return innerScanner;
         }
         // TODO:the above wrapped scanner should be used here also
         return getTopNScanner(c, innerScanner, iterator, tenantId);
     }
-    
+
     /**
      *  Return region scanner that does TopN.
      *  We only need to call startRegionOperation and closeRegionOperation when
@@ -278,9 +306,19 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
      * the same from a custom filter.
      * @param arrayFuncRefs 
      * @param arrayKVRefs 
+     * @param offset starting position in the rowkey.
+     * @param scan
+     * @param tupleProjector
+     * @param dataRegion
+     * @param indexMaintainer
+     * @param viewConstants
      */
-    private RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, 
-           final List<KeyValueColumnExpression> arrayKVRefs, final Expression[] arrayFuncRefs) {
+    private RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
+            final RegionScanner s, final List<KeyValueColumnExpression> arrayKVRefs,
+            final Expression[] arrayFuncRefs, final int offset, final Scan scan,
+            final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
+            final HRegion dataRegion, final IndexMaintainer indexMaintainer,
+            final byte[][] viewConstants) {
         return new RegionScanner() {
 
             @Override
@@ -332,12 +370,16 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
             public boolean nextRaw(List<Cell> result) throws IOException {
                 try {
                     boolean next = s.nextRaw(result);
-                    if(result.size() == 0) {
-                        return next;
-                    } else if((arrayFuncRefs != null && arrayFuncRefs.length == 0) || arrayKVRefs.size() == 0) {
+                    if (result.size() == 0) {
                         return next;
+                    } 
+                    if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) {
+                        replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
                     }
-                    replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
+                    if (ScanUtil.isLocalIndex(scan)) {
+                        IndexUtil.wrapResultUsingOffset(result, offset, dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
+                    }
+                    // There is a scanattribute set to retrieve the specific array element
                     return next;
                 } catch (Throwable t) {
                     ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
@@ -351,11 +393,14 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
                     boolean next = s.nextRaw(result, limit);
                     if (result.size() == 0) {
                         return next;
-                    } else if ((arrayFuncRefs != null && arrayFuncRefs.length == 0) || arrayKVRefs.size() == 0) { 
-                        return next; 
+                    } 
+                    if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) { 
+                        replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
+                    }
+                    if (offset > 0 || ScanUtil.isLocalIndex(scan)) {
+                        IndexUtil.wrapResultUsingOffset(result, offset, dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr);
                     }
                     // There is a scanattribute set to retrieve the specific array element
-                    replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
                     return next;
                 } catch (Throwable t) {
                     ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t);
@@ -395,13 +440,11 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
                         QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER.length, HConstants.LATEST_TIMESTAMP,
                         Type.codeToType(rowKv.getTypeByte()), value, 0, value.length));
             }
-            
+
             @Override
             public long getMaxResultSize() {
                 return s.getMaxResultSize();
             }
         };
     }
-    
-    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9a5529c8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 995889d..d8961c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -56,8 +57,11 @@ import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.expression.aggregator.Aggregators;
 import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.join.TupleProjector;
@@ -72,7 +76,9 @@ import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
@@ -89,7 +95,7 @@ import com.google.common.collect.Sets;
  * @since 0.1
  */
 public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver {
-
+    private ImmutableBytesWritable ptr = new ImmutableBytesWritable();
     // TODO: move all constants into a single class
     public static final String UNGROUPED_AGG = "UngroupedAgg";
     public static final String DELETE_AGG = "DeleteAgg";
@@ -130,6 +136,21 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         if (isUngroupedAgg == null) {
             return s;
         }
+        int offset = 0;
+        if (ScanUtil.isLocalIndex(scan)) {
+            /*
+             * For local indexes, we need to set an offset on row key expressions to skip
+             * the region start key.
+             */
+            HRegion region = c.getEnvironment().getRegion();
+            offset = region.getStartKey().length != 0 ? region.getStartKey().length:region.getEndKey().length;
+            ScanUtil.setRowKeyOffset(scan, offset);
+        }
+
+        byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+        List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
+        List<Mutation> indexMutations = localIndexBytes == null ? Collections.<Mutation>emptyList() : Lists.<Mutation>newArrayListWithExpectedSize(1024);
+        boolean localIndexScan = ScanUtil.isLocalIndex(scan);
         
         final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
@@ -137,7 +158,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         if (p != null || j != null)  {
             theScanner = new HashJoinRegionScanner(s, p, j, ScanUtil.getTenantId(scan), c.getEnvironment());
         }
-        final RegionScanner innerScanner = theScanner;
         
         byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
         PTable projectedTable = null;
@@ -165,6 +185,22 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             }
             emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF);
         }
+        if(localIndexBytes != null) {
+            ptr = new ImmutableBytesWritable();
+        }
+        TupleProjector scanProjector = null;
+        HRegion dataRegion = null;
+        byte[][] viewConstants = null;
+        ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
+        final RegionScanner innerScanner;
+        if (ScanUtil.isLocalIndex(scan) && !isDelete) {
+            if (dataColumns != null) {
+                scanProjector = IndexUtil.getTupleProjector(scan, dataColumns);
+                dataRegion = IndexUtil.getDataRegion(c.getEnvironment());
+                viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
+            }
+        } 
+        innerScanner = theScanner;
         
         int batchSize = 0;
         long ts = scan.getTimeRange().getMax();
@@ -186,6 +222,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
         long rowCount = 0;
         region.startRegionOperation();
+        ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
         try {
             do {
                 List<Cell> results = new ArrayList<Cell>();
@@ -193,11 +230,28 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 // since this is an indication of whether or not there are more values after the
                 // ones returned
                 hasMore = innerScanner.nextRaw(results);
+                
                 if (!results.isEmpty()) {
+                    if (localIndexScan && !isDelete) {
+                        IndexUtil.wrapResultUsingOffset(results, offset, dataColumns, scanProjector,
+                            dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0),
+                            viewConstants, tempPtr);
+                    }
                     rowCount++;
                     result.setKeyValues(results);
                     try {
-                        if (isDelete) {
+                        if (indexMaintainers != null && dataColumns==null && !localIndexScan) {
+                            // TODO: join back to data row here if scan attribute set
+                            for (IndexMaintainer maintainer : indexMaintainers) {
+                                if (!results.isEmpty()) {
+                                    result.getKey(ptr);
+                                    ValueGetter valueGetter = maintainer.createGetterFromKeyValues(results);
+                                    Put put = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts, c.getEnvironment().getRegion().getStartKey(), c.getEnvironment().getRegion().getEndKey());
+                                    indexMutations.add(put);
+                                }
+                            }
+                            result.setKeyValues(results);
+                        } else if (isDelete) {
                             // FIXME: the version of the Delete constructor without the lock args was introduced
                             // in 0.94.4, thus if we try to use it here we can no longer use the 0.94.2 version
                             // of the client.
@@ -278,6 +332,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                             commitBatch(region, mutations, indexUUID);
                             mutations.clear();
                         }
+                        // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
+                        if (!indexMutations.isEmpty() && batchSize > 0 && indexMutations.size() % batchSize == 0) {
+                            HRegion indexRegion = getIndexRegion(c.getEnvironment());
+                            // Get indexRegion corresponding to data region
+                            commitBatch(indexRegion, indexMutations, null);
+                            indexMutations.clear();
+                        }
+
                     } catch (ConstraintViolationException e) {
                         // Log and ignore in count
                         logger.error("Failed to create row in " + region.getRegionNameAsString() + " with values " + SchemaUtil.toString(values), e);
@@ -300,6 +362,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             commitBatch(region,mutations, indexUUID);
         }
 
+        if (!indexMutations.isEmpty()) {
+            HRegion indexRegion = getIndexRegion(c.getEnvironment());
+            // Get indexRegion corresponding to data region
+            commitBatch(indexRegion, indexMutations, null);
+            indexMutations.clear();
+        }
+
         final boolean hadAny = hasAny;
         KeyValue keyValue = null;
         if (hadAny) {
@@ -341,7 +410,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         };
         return scanner;
     }
-    
+
+    private HRegion getIndexRegion(RegionCoprocessorEnvironment environment) throws IOException {
+        HRegion userRegion = environment.getRegion();
+        TableName indexTableName = TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(userRegion.getTableDesc().getName()));
+        List<HRegion> onlineRegions = environment.getRegionServerServices().getOnlineRegions(indexTableName);
+        for(HRegion indexRegion : onlineRegions) {
+            if (Bytes.compareTo(userRegion.getStartKey(), indexRegion.getStartKey()) == 0) {
+                return indexRegion;
+            }
+        }
+        return null;
+    }
+
     private static PTable deserializeTable(byte[] b) {
         try {
             PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b);


Mime
View raw message