phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajeshb...@apache.org
Subject [3/5] phoenix git commit: PHOENIX-1734 Local index improvements(Rajeshbabu)
Date Thu, 16 Jun 2016 05:27:26 GMT
http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
deleted file mode 100644
index d835ce9..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java
+++ /dev/null
@@ -1,986 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT;
-import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT;
-import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.RegionTransition;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.executor.EventType;
-import org.apache.hadoop.hbase.io.Reference;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.CancelableProgressable;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.util.PairOfSameType;
-import org.apache.hadoop.hbase.zookeeper.ZKAssign;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.data.Stat;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-@InterfaceAudience.Private
-public class IndexSplitTransaction extends SplitTransactionImpl { // FIXME: Extends private type
-  private static final Log LOG = LogFactory.getLog(IndexSplitTransaction.class);
-
-  /*
-   * Region to split
-   */
-  private final HRegion parent;
-  private HRegionInfo hri_a;
-  private HRegionInfo hri_b;
-  private long fileSplitTimeout = 30000;
-  private int znodeVersion = -1;
-
-  /*
-   * Row to split around
-   */
-  private final byte [] splitrow;
-
-  /**
-   * Types to add to the transaction journal.
-   * Each enum is a step in the split transaction. Used to figure how much
-   * we need to rollback.
-   */
-  enum JournalEntry {
-    /**
-     * Set region as in transition, set it into SPLITTING state.
-     */
-    SET_SPLITTING_IN_ZK,
-    /**
-     * We created the temporary split data directory.
-     */
-    CREATE_SPLIT_DIR,
-    /**
-     * Closed the parent region.
-     */
-    CLOSED_PARENT_REGION,
-    /**
-     * The parent has been taken out of the server's online regions list.
-     */
-    OFFLINED_PARENT,
-    /**
-     * Started in on creation of the first daughter region.
-     */
-    STARTED_REGION_A_CREATION,
-    /**
-     * Started in on the creation of the second daughter region.
-     */
-    STARTED_REGION_B_CREATION,
-    /**
-     * Point of no return.
-     * If we got here, then transaction is not recoverable other than by
-     * crashing out the regionserver.
-     */
-    PONR
-  }
-
-  /*
-   * Journal of how far the split transaction has progressed.
-   */
-  private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
-
-  /**
-   * Constructor
-   * @param r Region to split
-   * @param splitrow Row to split around
-   */
-  public IndexSplitTransaction(final Region r, final byte [] splitrow) {
-    super(r , splitrow);
-    this.parent = (HRegion)r;
-    this.splitrow = splitrow;
-  }
-
-  /**
-   * Does checks on split inputs.
-   * @return <code>true</code> if the region is splittable else
-   * <code>false</code> if it is not (e.g. its already closed, etc.).
-   */
-  @Override
-  public boolean prepare() {
-    if (!this.parent.isSplittable()) return false;
-    // Split key can be null if this region is unsplittable; i.e. has refs.
-    if (this.splitrow == null) return false;
-    HRegionInfo hri = this.parent.getRegionInfo();
-    parent.prepareToSplit();
-    // Check splitrow.
-    byte [] startKey = hri.getStartKey();
-    byte [] endKey = hri.getEndKey();
-    if (Bytes.equals(startKey, splitrow) ||
-        !this.parent.getRegionInfo().containsRow(splitrow)) {
-      LOG.info("Split row is not inside region key range or is equal to " +
-          "startkey: " + Bytes.toStringBinary(this.splitrow));
-      return false;
-    }
-    long rid = getDaughterRegionIdTimestamp(hri);
-    this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
-    this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
-    return true;
-  }
-
-  /**
-   * Calculate daughter regionid to use.
-   * @param hri Parent {@link HRegionInfo}
-   * @return Daughter region id (timestamp) to use.
-   */
-  private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
-    long rid = EnvironmentEdgeManager.currentTimeMillis();
-    // Regionid is timestamp.  Can't be less than that of parent else will insert
-    // at wrong location in hbase:meta (See HBASE-710).
-    if (rid < hri.getRegionId()) {
-      LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
-        " but current time here is " + rid);
-      rid = hri.getRegionId() + 1;
-    }
-    return rid;
-  }
-
-  private static IOException closedByOtherException = new IOException(
-      "Failed to close region: already closed by another thread");
-
-  /**
-   * Prepare the regions and region files.
-   * @param server Hosting server instance.  Can be null when testing (won't try
-   * and update in zk if a null server)
-   * @param services Used to online/offline regions.
-   * @throws IOException If thrown, transaction failed.
-   *    Call {@link #rollback(Server, RegionServerServices)}
-   * @return Regions created
-   */
-  @Override
-  /* package */PairOfSameType<Region> createDaughters(final Server server,
-      final RegionServerServices services) throws IOException {
-    LOG.info("Starting split of region " + this.parent);
-    if ((server != null && server.isStopped()) ||
-        (services != null && services.isStopping())) {
-      throw new IOException("Server is stopped or stopping");
-    }
-    assert !this.parent.lock.writeLock().isHeldByCurrentThread():
-      "Unsafe to hold write lock while performing RPCs";
-
-    // Coprocessor callback
-    if (this.parent.getCoprocessorHost() != null) {
-      this.parent.getCoprocessorHost().preSplit();
-    }
-
-    // Coprocessor callback
-    if (this.parent.getCoprocessorHost() != null) {
-      this.parent.getCoprocessorHost().preSplit(this.splitrow);
-    }
-
-    // If true, no cluster to write meta edits to or to update znodes in.
-    boolean testing = server == null? true:
-        server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
-    this.fileSplitTimeout = testing ? this.fileSplitTimeout :
-        server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
-          this.fileSplitTimeout);
-
-    PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing);
-
-    List<Mutation> metaEntries = new ArrayList<Mutation>();
-    if (this.parent.getCoprocessorHost() != null) {
-      if (this.parent.getCoprocessorHost().
-          preSplitBeforePONR(this.splitrow, metaEntries)) {
-        throw new IOException("Coprocessor bypassing region "
-            + this.parent.getRegionInfo().getRegionNameAsString() + " split.");
-      }
-      try {
-        for (Mutation p : metaEntries) {
-          HRegionInfo.parseRegionName(p.getRow());
-        }
-      } catch (IOException e) {
-        LOG.error("Row key of mutation from coprossor is not parsable as region name."
-            + "Mutations from coprocessor should only for hbase:meta table.");
-        throw e;
-      }
-    }
-
-    // This is the point of no return.  Adding subsequent edits to .META. as we
-    // do below when we do the daughter opens adding each to .META. can fail in
-    // various interesting ways the most interesting of which is a timeout
-    // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
-    // then subsequent failures need to crash out this regionserver; the
-    // server shutdown processing should be able to fix-up the incomplete split.
-    // The offlined parent will have the daughters as extra columns.  If
-    // we leave the daughter regions in place and do not remove them when we
-    // crash out, then they will have their references to the parent in place
-    // still and the server shutdown fixup of .META. will point to these
-    // regions.
-    // We should add PONR JournalEntry before offlineParentInMeta,so even if
-    // OfflineParentInMeta timeout,this will cause regionserver exit,and then
-    // master ServerShutdownHandler will fix daughter & avoid data loss. (See
-    // HBase-4562).
-    this.journal.add(JournalEntry.PONR);
-
-    // Edit parent in meta.  Offlines parent region and adds splita and splitb
-    // as an atomic update. See HBASE-7721. This update to META makes the region
-    // will determine whether the region is split or not in case of failures.
-    // If it is successful, master will roll-forward, if not, master will rollback
-    // and assign the parent region.
-    if (!testing) {
-      if (metaEntries == null || metaEntries.isEmpty()) {
-        MetaTableAccessor.splitRegion(server.getConnection(), parent.getRegionInfo(),
-                daughterRegions.getFirst().getRegionInfo(),
-                daughterRegions.getSecond().getRegionInfo(), server.getServerName(),
-                parent.getTableDesc().getRegionReplication());
-      } else {
-        offlineParentInMetaAndputMetaEntries(server.getConnection(),
-          parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
-              .getSecond().getRegionInfo(), server.getServerName(), metaEntries,
-              parent.getTableDesc().getRegionReplication());
-      }
-    }
-    return daughterRegions;
-  }
-
-  @Override
-  public PairOfSameType<Region> stepsBeforePONR(final Server server,
-      final RegionServerServices services, boolean testing) throws IOException {
-    // Set ephemeral SPLITTING znode up in zk.  Mocked servers sometimes don't
-    // have zookeeper so don't do zk stuff if server or zookeeper is null
-    if (server != null && server.getZooKeeper() != null) {
-      try {
-        createNodeSplitting(server.getZooKeeper(),
-          parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
-      } catch (KeeperException e) {
-        throw new IOException("Failed creating PENDING_SPLIT znode on " +
-          this.parent.getRegionInfo().getRegionNameAsString(), e);
-      }
-    }
-    this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK);
-    if (server != null && server.getZooKeeper() != null) {
-      // After creating the split node, wait for master to transition it
-      // from PENDING_SPLIT to SPLITTING so that we can move on. We want master
-      // knows about it and won't transition any region which is splitting.
-      znodeVersion = getZKNode(server, services);
-    }
-
-    this.parent.getRegionFileSystem().createSplitsDir();
-    this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
-
-    Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
-    Exception exceptionToThrow = null;
-    try{
-      hstoreFilesToSplit = this.parent.close(false);
-    } catch (Exception e) {
-      exceptionToThrow = e;
-    }
-    if (exceptionToThrow == null && hstoreFilesToSplit == null) {
-      // The region was closed by a concurrent thread.  We can't continue
-      // with the split, instead we must just abandon the split.  If we
-      // reopen or split this could cause problems because the region has
-      // probably already been moved to a different server, or is in the
-      // process of moving to a different server.
-      exceptionToThrow = closedByOtherException;
-    }
-    if (exceptionToThrow != closedByOtherException) {
-      this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
-    }
-    if (exceptionToThrow != null) {
-      if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
-      throw new IOException(exceptionToThrow);
-    }
-    if (!testing) {
-      services.removeFromOnlineRegions(this.parent, null);
-    }
-    this.journal.add(JournalEntry.OFFLINED_PARENT);
-
-    // TODO: If splitStoreFiles were multithreaded would we complete steps in
-    // less elapsed time?  St.Ack 20100920
-    //
-    // splitStoreFiles creates daughter region dirs under the parent splits dir
-    // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
-    // clean this up.
-    splitStoreFiles(hstoreFilesToSplit);
-
-    // Log to the journal that we are creating region A, the first daughter
-    // region.  We could fail halfway through.  If we do, we could have left
-    // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
-    // add entry to journal BEFORE rather than AFTER the change.
-    this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
-    Region a = this.parent.createDaughterRegionFromSplits(this.hri_a);
-
-    // Ditto
-    this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
-    Region b = this.parent.createDaughterRegionFromSplits(this.hri_b);
-    return new PairOfSameType<Region>(a, b);
-  }
-
-  /**
-   * Perform time consuming opening of the daughter regions.
-   * @param server Hosting server instance.  Can be null when testing (won't try
-   * and update in zk if a null server)
-   * @param services Used to online/offline regions.
-   * @param a first daughter region
-   * @param a second daughter region
-   * @throws IOException If thrown, transaction failed.
-   *          Call {@link #rollback(Server, RegionServerServices)}
-   */
-  @Override
-  /* package */void openDaughters(final Server server,
-      final RegionServerServices services, Region a, Region b)
-      throws IOException {
-    boolean stopped = server != null && server.isStopped();
-    boolean stopping = services != null && services.isStopping();
-    // TODO: Is this check needed here?
-    if (stopped || stopping) {
-      LOG.info("Not opening daughters " +
-          b.getRegionInfo().getRegionNameAsString() +
-          " and " +
-          a.getRegionInfo().getRegionNameAsString() +
-          " because stopping=" + stopping + ", stopped=" + stopped);
-    } else {
-      // Open daughters in parallel.
-      DaughterOpener aOpener = new DaughterOpener(server, (HRegion)a);
-      DaughterOpener bOpener = new DaughterOpener(server, (HRegion)b);
-      aOpener.start();
-      bOpener.start();
-      try {
-        aOpener.join();
-        bOpener.join();
-      } catch (InterruptedException e) {
-        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
-      }
-      if (aOpener.getException() != null) {
-        throw new IOException("Failed " +
-          aOpener.getName(), aOpener.getException());
-      }
-      if (bOpener.getException() != null) {
-        throw new IOException("Failed " +
-          bOpener.getName(), bOpener.getException());
-      }
-      if (services != null) {
-        try {
-          // add 2nd daughter first (see HBASE-4335)
-          services.postOpenDeployTasks(b);
-          // Should add it to OnlineRegions
-          services.addToOnlineRegions(b);
-          services.postOpenDeployTasks(a);
-          services.addToOnlineRegions(a);
-        } catch (KeeperException ke) {
-          throw new IOException(ke);
-        }
-      }
-    }
-  }
-
-  /**
-   * Finish off split transaction, transition the zknode
-   * @param server Hosting server instance.  Can be null when testing (won't try
-   * and update in zk if a null server)
-   * @param services Used to online/offline regions.
-   * @param a first daughter region
-   * @param a second daughter region
-   * @throws IOException If thrown, transaction failed.
-   *          Call {@link #rollback(Server, RegionServerServices)}
-   */
-  /* package */void transitionZKNode(final Server server,
-      final RegionServerServices services, Region a, Region b)
-      throws IOException {
-    // Tell master about split by updating zk.  If we fail, abort.
-    if (server != null && server.getZooKeeper() != null) {
-      try {
-        this.znodeVersion = transitionSplittingNode(server.getZooKeeper(),
-          parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
-          server.getServerName(), this.znodeVersion,
-          RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT);
-
-        int spins = 0;
-        // Now wait for the master to process the split. We know it's done
-        // when the znode is deleted. The reason we keep tickling the znode is
-        // that it's possible for the master to miss an event.
-        do {
-          if (spins % 10 == 0) {
-            LOG.debug("Still waiting on the master to process the split for " +
-                this.parent.getRegionInfo().getEncodedName());
-          }
-          Thread.sleep(100);
-          // When this returns -1 it means the znode doesn't exist
-          this.znodeVersion = transitionSplittingNode(server.getZooKeeper(),
-            parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(),
-            server.getServerName(), this.znodeVersion,
-            RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT);
-          spins++;
-        } while (this.znodeVersion != -1 && !server.isStopped()
-            && !services.isStopping());
-      } catch (Exception e) {
-        if (e instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
-        }
-        throw new IOException("Failed telling master about split", e);
-      }
-    }
-
-    // Coprocessor callback
-    if (this.parent.getCoprocessorHost() != null) {
-      this.parent.getCoprocessorHost().postSplit(a,b);
-    }
-
-    // Leaving here, the splitdir with its dross will be in place but since the
-    // split was successful, just leave it; it'll be cleaned when parent is
-    // deleted and cleaned up.
-  }
-
-  /**
-   * Wait for the splitting node to be transitioned from pending_split
-   * to splitting by master. That's how we are sure master has processed
-   * the event and is good with us to move on. If we don't get any update,
-   * we periodically transition the node so that master gets the callback.
-   * If the node is removed or is not in pending_split state any more,
-   * we abort the split.
-   */
-  private int getZKNode(final Server server,
-      final RegionServerServices services) throws IOException {
-    // Wait for the master to process the pending_split.
-    try {
-      int spins = 0;
-      Stat stat = new Stat();
-      ZooKeeperWatcher zkw = server.getZooKeeper();
-      ServerName expectedServer = server.getServerName();
-      String node = parent.getRegionInfo().getEncodedName();
-      while (!(server.isStopped() || services.isStopping())) {
-        if (spins % 5 == 0) {
-          LOG.debug("Still waiting for master to process "
-            + "the pending_split for " + node);
-          transitionSplittingNode(zkw, parent.getRegionInfo(),
-            hri_a, hri_b, expectedServer, -1, RS_ZK_REQUEST_REGION_SPLIT,
-            RS_ZK_REQUEST_REGION_SPLIT);
-        }
-        Thread.sleep(100);
-        spins++;
-        byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat);
-        if (data == null) {
-          throw new IOException("Data is null, splitting node "
-            + node + " no longer exists");
-        }
-        RegionTransition rt = RegionTransition.parseFrom(data);
-        EventType et = rt.getEventType();
-        if (et == RS_ZK_REGION_SPLITTING) {
-          ServerName serverName = rt.getServerName();
-          if (!serverName.equals(expectedServer)) {
-            throw new IOException("Splitting node " + node + " is for "
-              + serverName + ", not us " + expectedServer);
-          }
-          byte [] payloadOfSplitting = rt.getPayload();
-          List<HRegionInfo> splittingRegions = HRegionInfo.parseDelimitedFrom(
-            payloadOfSplitting, 0, payloadOfSplitting.length);
-          assert splittingRegions.size() == 2;
-          HRegionInfo a = splittingRegions.get(0);
-          HRegionInfo b = splittingRegions.get(1);
-          if (!(hri_a.equals(a) && hri_b.equals(b))) {
-            throw new IOException("Splitting node " + node + " is for " + a + ", "
-              + b + ", not expected daughters: " + hri_a + ", " + hri_b);
-          }
-          // Master has processed it.
-          return stat.getVersion();
-        }
-        if (et != RS_ZK_REQUEST_REGION_SPLIT) {
-          throw new IOException("Splitting node " + node
-            + " moved out of splitting to " + et);
-        }
-      }
-      // Server is stopping/stopped
-      throw new IOException("Server is "
-        + (services.isStopping() ? "stopping" : "stopped"));
-    } catch (Exception e) {
-      if (e instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
-      }
-      throw new IOException("Failed getting SPLITTING znode on "
-        + parent.getRegionInfo().getRegionNameAsString(), e);
-    }
-  }
-
-  /**
-   * Run the transaction.
-   * @param server Hosting server instance.  Can be null when testing (won't try
-   * and update in zk if a null server)
-   * @param services Used to online/offline regions.
-   * @throws IOException If thrown, transaction failed.
-   *          Call {@link #rollback(Server, RegionServerServices)}
-   * @return Regions created
-   * @throws IOException
-   * @see #rollback(Server, RegionServerServices)
-   */
-  @Override
-  public PairOfSameType<Region> execute(final Server server,
-      final RegionServerServices services)
-  throws IOException {
-    PairOfSameType<Region> regions = createDaughters(server, services);
-    if (this.parent.getCoprocessorHost() != null) {
-      this.parent.getCoprocessorHost().preSplitAfterPONR();
-    }
-    return stepsAfterPONR(server, services, regions);
-  }
-
-  @Override
-  public PairOfSameType<Region> stepsAfterPONR(final Server server,
-      final RegionServerServices services, PairOfSameType<Region> regions)
-      throws IOException {
-    openDaughters(server, services, regions.getFirst(), regions.getSecond());
-    transitionZKNode(server, services, regions.getFirst(), regions.getSecond());
-    return regions;
-  }
-
-  private void offlineParentInMetaAndputMetaEntries(Connection conn,
-      HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
-      ServerName serverName, List<Mutation> metaEntries, int regionReplication) throws IOException {
-    List<Mutation> mutations = metaEntries;
-    HRegionInfo copyOfParent = new HRegionInfo(parent);
-    copyOfParent.setOffline(true);
-    copyOfParent.setSplit(true);
-
-    //Put for parent
-    Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
-    MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB);
-    mutations.add(putParent);
-
-    //Puts for daughters
-    Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA);
-    Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB);
-
-    addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine.
-    addLocation(putB, serverName, 1);
-    mutations.add(putA);
-    mutations.add(putB);
-
-    // Add empty locations for region replicas of daughters so that number of replicas can be
-    // cached whenever the primary region is looked up from meta
-    for (int i = 1; i < regionReplication; i++) {
-      addEmptyLocation(putA, i);
-      addEmptyLocation(putB, i);
-    }
-
-    MetaTableAccessor.mutateMetaTable(conn, mutations);
-  }
-
-  @Override
-  public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
-    p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
-      Bytes.toBytes(sn.getHostAndPort()));
-    p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
-      Bytes.toBytes(sn.getStartcode()));
-    p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
-        Bytes.toBytes(openSeqNum));
-    return p;
-  }
-
-  private static Put addEmptyLocation(final Put p, int replicaId){
-    p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(replicaId), null);
-    p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getStartCodeColumn(replicaId), null);
-    p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getSeqNumColumn(replicaId), null);
-    return p;
-  }
-
-  /*
-   * Open daughter region in its own thread.
-   * If we fail, abort this hosting server.
-   */
-  class DaughterOpener extends HasThread {
-    private final Server server;
-    private final HRegion r;
-    private Throwable t = null;
-
-    DaughterOpener(final Server s, final HRegion r) {
-      super((s == null? "null-services": s.getServerName()) +
-        "-daughterOpener=" + r.getRegionInfo().getEncodedName());
-      setDaemon(true);
-      this.server = s;
-      this.r = r;
-    }
-
-    /**
-     * @return Null if open succeeded else exception that causes us fail open.
-     * Call it after this thread exits else you may get wrong view on result.
-     */
-    Throwable getException() {
-      return this.t;
-    }
-
-    @Override
-    public void run() {
-      try {
-        openDaughterRegion(this.server, r);
-      } catch (Throwable t) {
-        this.t = t;
-      }
-    }
-  }
-
-  /**
-   * Open daughter regions, add them to online list and update meta.
-   * @param server
-   * @param daughter
-   * @throws IOException
-   * @throws KeeperException
-   */
-  @Override
-  void openDaughterRegion(final Server server, final HRegion daughter)
-  throws IOException, KeeperException {
-    HRegionInfo hri = daughter.getRegionInfo();
-    LoggingProgressable reporter = server == null ? null
-        : new LoggingProgressable(hri, server.getConfiguration().getLong(
-            "hbase.regionserver.split.daughter.open.log.interval", 10000));
-    daughter.openHRegion(reporter);
-  }
-
-  static class LoggingProgressable implements CancelableProgressable {
-    private final HRegionInfo hri;
-    private long lastLog = -1;
-    private final long interval;
-
-    LoggingProgressable(final HRegionInfo hri, final long interval) {
-      this.hri = hri;
-      this.interval = interval;
-    }
-
-    @Override
-    public boolean progress() {
-      long now = System.currentTimeMillis();
-      if (now - lastLog > this.interval) {
-        LOG.info("Opening " + this.hri.getRegionNameAsString());
-        this.lastLog = now;
-      }
-      return true;
-    }
-  }
-
-  private void splitStoreFiles(final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
-      throws IOException {
-    if (hstoreFilesToSplit == null) {
-      // Could be null because close didn't succeed -- for now consider it fatal
-      throw new IOException("Close returned empty list of StoreFiles");
-    }
-    // The following code sets up a thread pool executor with as many slots as
-    // there's files to split. It then fires up everything, waits for
-    // completion and finally checks for any exception
-    int nbFiles = hstoreFilesToSplit.size();
-    if (nbFiles == 0) {
-      // no file needs to be splitted.
-      return;
-    }
-    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
-    builder.setNameFormat("StoreFileSplitter-%1$d");
-    ThreadFactory factory = builder.build();
-    ThreadPoolExecutor threadPool =
-      (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
-    List<Future<Void>> futures = new ArrayList<Future<Void>>(nbFiles);
-
-    // Split each store file.
-    for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
-      for (StoreFile sf: entry.getValue()) {
-        StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
-        futures.add(threadPool.submit(sfs));
-      }
-    }
-    // Shutdown the pool
-    threadPool.shutdown();
-
-    // Wait for all the tasks to finish
-    try {
-      boolean stillRunning = !threadPool.awaitTermination(
-          this.fileSplitTimeout, TimeUnit.MILLISECONDS);
-      if (stillRunning) {
-        threadPool.shutdownNow();
-        // wait for the thread to shutdown completely.
-        while (!threadPool.isTerminated()) {
-          Thread.sleep(50);
-        }
-        throw new IOException("Took too long to split the" +
-            " files and create the references, aborting split");
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw (InterruptedIOException)new InterruptedIOException().initCause(e);
-    }
-
-    // Look for any exception
-    for (Future<Void> future: futures) {
-      try {
-        future.get();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
-      } catch (ExecutionException e) {
-        throw new IOException(e);
-      }
-    }
-  }
-
-  /**
-   * Utility class used to do the file splitting / reference writing
-   * in parallel instead of sequentially.
-   */
-  class StoreFileSplitter implements Callable<Void> {
-    private final byte[] family;
-    private final StoreFile sf;
-
-    /**
-     * Constructor that takes what it needs to split
-     * @param family Family that contains the store file
-     * @param sf which file
-     */
-    public StoreFileSplitter(final byte[] family, final StoreFile sf) {
-      this.sf = sf;
-      this.family = family;
-    }
-
-    @Override
-    public Void call() throws IOException {
-      splitStoreFile(family, sf);
-      return null;
-    }
-  }
-
-    private void splitStoreFile(final byte[] family, final StoreFile sf) throws IOException {
-        HRegionFileSystem fs = this.parent.getRegionFileSystem();
-        String familyName = Bytes.toString(family);
-        splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false, fs);
-        splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true, fs);
-    }
-
-    private Path splitStoreFile(HRegionInfo hri, String familyName, StoreFile f, byte[] splitRow,
-            boolean top, HRegionFileSystem fs) throws IOException {
-        f.closeReader(true);
-        Path splitDir =
-                new Path(fs.getSplitsDir(hri), familyName);
-        // A reference to the bottom half of the hsf store file.
-        Reference r =
-                top ? Reference.createTopReference(splitRow) : Reference
-                        .createBottomReference(splitRow);
-        // Add the referred-to regions name as a dot separated suffix.
-        // See REF_NAME_REGEX regex above. The referred-to regions name is
-        // up in the path of the passed in <code>f</code> -- parentdir is family,
-        // then the directory above is the region name.
-        String parentRegionName = this.parent.getRegionInfo().getEncodedName();
-        // Write reference with same file id only with the other region name as
-        // suffix and into the new region location (under same family).
-        Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
-        return r.write(fs.getFileSystem(), p);
-    }
-
-  /**
-   * @param server Hosting server instance (May be null when testing).
-   * @param services
-   * @throws IOException If thrown, rollback failed.  Take drastic action.
-   * @return True if we successfully rolled back, false if we got to the point
-   * of no return and so now need to abort the server to minimize damage.
-   */
-  @Override
-  @SuppressWarnings("deprecation")
-  public boolean rollback(final Server server, final RegionServerServices services)
-  throws IOException {
-    // Coprocessor callback
-    if (this.parent.getCoprocessorHost() != null) {
-      this.parent.getCoprocessorHost().preRollBackSplit();
-    }
-
-    boolean result = true;
-    ListIterator<JournalEntry> iterator =
-      this.journal.listIterator(this.journal.size());
-    // Iterate in reverse.
-    while (iterator.hasPrevious()) {
-      JournalEntry je = iterator.previous();
-      switch(je) {
-
-      case SET_SPLITTING_IN_ZK:
-        if (server != null && server.getZooKeeper() != null) {
-          cleanZK(server, this.parent.getRegionInfo());
-        }
-        break;
-
-      case CREATE_SPLIT_DIR:
-        this.parent.writestate.writesEnabled = true;
-        this.parent.getRegionFileSystem().cleanupSplitsDir();
-        break;
-
-      case CLOSED_PARENT_REGION:
-        try {
-          // So, this returns a seqid but if we just closed and then reopened, we
-          // should be ok. On close, we flushed using sequenceid obtained from
-          // hosting regionserver so no need to propagate the sequenceid returned
-          // out of initialize below up into regionserver as we normally do.
-          // TODO: Verify.
-          this.parent.initialize();
-        } catch (IOException e) {
-          LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
-            this.parent.getRegionInfo().getRegionNameAsString(), e);
-          throw new RuntimeException(e);
-        }
-        break;
-
-      case STARTED_REGION_A_CREATION:
-        this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
-        break;
-
-      case STARTED_REGION_B_CREATION:
-        this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
-        break;
-
-      case OFFLINED_PARENT:
-        if (services != null) services.addToOnlineRegions(this.parent);
-        break;
-
-      case PONR:
-        // We got to the point-of-no-return so we need to just abort. Return
-        // immediately.  Do not clean up created daughter regions.  They need
-        // to be in place so we don't delete the parent region mistakenly.
-        // See HBASE-3872.
-        return false;
-
-      default:
-        throw new RuntimeException("Unhandled journal entry: " + je);
-      }
-    }
-    // Coprocessor callback
-    if (this.parent.getCoprocessorHost() != null) {
-      this.parent.getCoprocessorHost().postRollBackSplit();
-    }
-    return result;
-  }
-
-  @Override
-  HRegionInfo getFirstDaughter() {
-    return hri_a;
-  }
-
-  @Override
-  HRegionInfo getSecondDaughter() {
-    return hri_b;
-  }
-
-  private static void cleanZK(final Server server, final HRegionInfo hri) {
-    try {
-      // Only delete if its in expected state; could have been hijacked.
-      if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
-          RS_ZK_REQUEST_REGION_SPLIT, server.getServerName())) {
-        ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
-          RS_ZK_REGION_SPLITTING, server.getServerName());
-      }
-    } catch (KeeperException.NoNodeException e) {
-      LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
-    } catch (KeeperException e) {
-      server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e);
-    }
-  }
-
-  /**
-   * Creates a new ephemeral node in the PENDING_SPLIT state for the specified region.
-   * Create it ephemeral in case regionserver dies mid-split.
-   *
-   * <p>Does not transition nodes from other states.  If a node already exists
-   * for this region, a {@link NodeExistsException} will be thrown.
-   *
-   * @param zkw zk reference
-   * @param region region to be created as offline
-   * @param serverName server event originates from
-   * @throws KeeperException
-   * @throws IOException
-   */
-  public static void createNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo region,
-      final ServerName serverName, final HRegionInfo a,
-      final HRegionInfo b) throws KeeperException, IOException {
-    LOG.debug(zkw.prefix("Creating ephemeral node for " +
-      region.getEncodedName() + " in PENDING_SPLIT state"));
-    byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
-    RegionTransition rt = RegionTransition.createRegionTransition(
-      RS_ZK_REQUEST_REGION_SPLIT, region.getRegionName(), serverName, payload);
-    String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
-    if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
-      throw new IOException("Failed create of ephemeral " + node);
-    }
-  }
-
-  /**
-   * Transitions an existing ephemeral node for the specified region which is
-   * currently in the begin state to be in the end state. Master cleans up the
-   * final SPLIT znode when it reads it (or if we crash, zk will clean it up).
-   *
-   * <p>Does not transition nodes from other states. If for some reason the
-   * node could not be transitioned, the method returns -1. If the transition
-   * is successful, the version of the node after transition is returned.
-   *
-   * <p>This method can fail and return false for three different reasons:
-   * <ul><li>Node for this region does not exist</li>
-   * <li>Node for this region is not in the begin state</li>
-   * <li>After verifying the begin state, update fails because of wrong version
-   * (this should never actually happen since an RS only does this transition
-   * following a transition to the begin state. If two RS are conflicting, one would
-   * fail the original transition to the begin state and not this transition)</li>
-   * </ul>
-   *
-   * <p>Does not set any watches.
-   *
-   * <p>This method should only be used by a RegionServer when splitting a region.
-   *
-   * @param zkw zk reference
-   * @param parent region to be transitioned to opened
-   * @param a Daughter a of split
-   * @param b Daughter b of split
-   * @param serverName server event originates from
-   * @param znodeVersion expected version of data before modification
-   * @param beginState the expected current state the znode should be
-   * @param endState the state to be transition to
-   * @return version of node after transition, -1 if unsuccessful transition
-   * @throws KeeperException if unexpected zookeeper exception
-   * @throws IOException
-   */
-  public static int transitionSplittingNode(ZooKeeperWatcher zkw,
-      HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName,
-      final int znodeVersion, final EventType beginState,
-      final EventType endState) throws KeeperException, IOException {
-    byte [] payload = HRegionInfo.toDelimitedByteArray(a, b);
-    return ZKAssign.transitionNode(zkw, parent, serverName,
-      beginState, endState, znodeVersion, payload);
-  }
-
-  public HRegion getParent() {
-    return this.parent;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java
deleted file mode 100644
index e361343..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionServerObserver;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
-import org.apache.phoenix.schema.types.PBoolean;
-import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.SchemaUtil;
-
-public class LocalIndexMerger extends BaseRegionServerObserver {
-
-    private static final Log LOG = LogFactory.getLog(LocalIndexMerger.class);
-
-    private RegionMergeTransactionImpl rmt = null; // FIXME: Use of private type
-    private HRegion mergedRegion = null; // FIXME: Use of private type
-
-    @Override
-    public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
-            Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException {
-        HTableDescriptor tableDesc = regionA.getTableDesc();
-        if (SchemaUtil.isSystemTable(tableDesc.getName())) {
-            return;
-        }
-        RegionServerServices rss = ctx.getEnvironment().getRegionServerServices();
-        HRegionServer rs = (HRegionServer) rss;
-        if (tableDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) == null
-                || !Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(tableDesc
-                        .getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
-            TableName indexTable =
-                    TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(tableDesc.getName()));
-            if (!MetaTableAccessor.tableExists(rs.getConnection(), indexTable)) return;
-            Region indexRegionA = IndexUtil.getIndexRegion(regionA, ctx.getEnvironment());
-            if (indexRegionA == null) {
-                LOG.warn("Index region corresponindg to data region " + regionA
-                        + " not in the same server. So skipping the merge.");
-                ctx.bypass();
-                return;
-            }
-            Region indexRegionB = IndexUtil.getIndexRegion(regionB, ctx.getEnvironment());
-            if (indexRegionB == null) {
-                LOG.warn("Index region corresponindg to region " + regionB
-                        + " not in the same server. So skipping the merge.");
-                ctx.bypass();
-                return;
-            }
-            try {
-                rmt = new RegionMergeTransactionImpl(indexRegionA, indexRegionB, false);
-                if (!rmt.prepare(rss)) {
-                    LOG.error("Prepare for the index regions merge [" + indexRegionA + ","
-                            + indexRegionB + "] failed. So returning null. ");
-                    ctx.bypass();
-                    return;
-                }
-                this.mergedRegion = rmt.stepsBeforePONR(rss, rss, false);
-                rmt.prepareMutationsForMerge(mergedRegion.getRegionInfo(),
-                    indexRegionA.getRegionInfo(), indexRegionB.getRegionInfo(),
-                    rss.getServerName(), metaEntries,
-                    mergedRegion.getTableDesc().getRegionReplication());
-            } catch (Exception e) {
-                ctx.bypass();
-                LOG.warn("index regions merge failed with the exception ", e);
-                if (rmt != null) {
-                    rmt.rollback(rss, rss);
-                    rmt = null;
-                    mergedRegion = null;
-                }
-            }
-        }
-    }
-
-    @Override
-    public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
-            Region regionA, Region regionB, Region mergedRegion) throws IOException {
-        if (rmt != null && this.mergedRegion != null) {
-            RegionServerCoprocessorEnvironment environment = ctx.getEnvironment();
-            HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
-            rmt.stepsAfterPONR(rs, rs, this.mergedRegion);
-        }
-    }
-
-    @Override
-    public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
-            Region regionA, Region regionB) throws IOException {
-        HRegionServer rs = (HRegionServer) ctx.getEnvironment().getRegionServerServices();
-        try {
-            if (rmt != null) {
-                rmt.rollback(rs, rs);
-                rmt = null;
-                mergedRegion = null;
-            }
-        } catch (Exception e) {
-            LOG.error("Error while rolling back the merge failure for index regions", e);
-            rs.abort("Abort; we got an error during rollback of index");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
index ba158a8..c60058c 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java
@@ -17,145 +17,8 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.PairOfSameType;
-import org.apache.phoenix.hbase.index.util.VersionUtil;
-import org.apache.phoenix.parse.ParseNodeFactory;
-import org.apache.phoenix.schema.types.PBoolean;
-import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.SchemaUtil;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.List;
 
 public class LocalIndexSplitter extends BaseRegionObserver {
-
-    private static final Log LOG = LogFactory.getLog(LocalIndexSplitter.class);
-
-    private SplitTransactionImpl st = null; // FIXME: Uses private type
-    private PairOfSameType<Region> daughterRegions = null;
-    private static final ParseNodeFactory FACTORY = new ParseNodeFactory();
-    private static final int SPLIT_TXN_MINIMUM_SUPPORTED_VERSION = VersionUtil
-            .encodeVersion("0.98.9");
-
-    @Override
-    public void preSplitBeforePONR(ObserverContext<RegionCoprocessorEnvironment> ctx,
-            byte[] splitKey, List<Mutation> metaEntries) throws IOException {
-        RegionCoprocessorEnvironment environment = ctx.getEnvironment();
-        HTableDescriptor tableDesc = ctx.getEnvironment().getRegion().getTableDesc();
-        if (SchemaUtil.isSystemTable(tableDesc.getName())) {
-            return;
-        }
-        final RegionServerServices rss = ctx.getEnvironment().getRegionServerServices();
-        if (tableDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) == null
-                || !Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(tableDesc
-                        .getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
-            TableName indexTable =
-                    TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(tableDesc.getName()));
-            if (!MetaTableAccessor.tableExists(rss.getConnection(), indexTable)) return;
-
-            Region indexRegion = IndexUtil.getIndexRegion(environment);
-            if (indexRegion == null) {
-                LOG.warn("Index region corresponindg to data region " + environment.getRegion()
-                        + " not in the same server. So skipping the split.");
-                ctx.bypass();
-                return;
-            }
-            // FIXME: Uses private type
-            try {
-                int encodedVersion = VersionUtil.encodeVersion(environment.getHBaseVersion());
-                if(encodedVersion >= SPLIT_TXN_MINIMUM_SUPPORTED_VERSION) {
-                    st = new SplitTransactionImpl(indexRegion, splitKey);
-                    st.useZKForAssignment =
-                            environment.getConfiguration().getBoolean("hbase.assignment.usezk",
-                                true);
-                } else {
-                    st = new IndexSplitTransaction(indexRegion, splitKey);
-                }
-
-                if (!st.prepare()) {
-                    LOG.error("Prepare for the table " + indexRegion.getTableDesc().getNameAsString()
-                        + " failed. So returning null. ");
-                    ctx.bypass();
-                    return;
-                }
-                ((HRegion)indexRegion).forceSplit(splitKey);
-                User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
-                  @Override
-                  public Void run() throws Exception {                  
-                    daughterRegions = st.stepsBeforePONR(rss, rss, false);
-                    return null;
-                  }
-                });
-                HRegionInfo copyOfParent = new HRegionInfo(indexRegion.getRegionInfo());
-                copyOfParent.setOffline(true);
-                copyOfParent.setSplit(true);
-                // Put for parent
-                Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
-                MetaTableAccessor.addDaughtersToPut(putParent,
-                        daughterRegions.getFirst().getRegionInfo(),
-                        daughterRegions.getSecond().getRegionInfo());
-                metaEntries.add(putParent);
-                // Puts for daughters
-                Put putA = MetaTableAccessor.makePutFromRegionInfo(
-                        daughterRegions.getFirst().getRegionInfo());
-                Put putB = MetaTableAccessor.makePutFromRegionInfo(
-                        daughterRegions.getSecond().getRegionInfo());
-                st.addLocation(putA, rss.getServerName(), 1);
-                st.addLocation(putB, rss.getServerName(), 1);
-                metaEntries.add(putA);
-                metaEntries.add(putB);
-            } catch (Exception e) {
-                ctx.bypass();
-                LOG.warn("index region splitting failed with the exception ", e);
-                if (st != null){
-                    st.rollback(rss, rss);
-                    st = null;
-                    daughterRegions = null;
-                }
-            }
-        }
-    }
-
-    @Override
-    public void preSplitAfterPONR(ObserverContext<RegionCoprocessorEnvironment> ctx)
-            throws IOException {
-        if (st == null || daughterRegions == null) return;
-        RegionCoprocessorEnvironment environment = ctx.getEnvironment();
-        HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
-        st.stepsAfterPONR(rs, rs, daughterRegions);
-    }
     
-    @Override
-    public void preRollBackSplit(ObserverContext<RegionCoprocessorEnvironment> ctx)
-            throws IOException {
-        RegionCoprocessorEnvironment environment = ctx.getEnvironment();
-        HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
-        try {
-            if (st != null) {
-                st.rollback(rs, rs);
-                st = null;
-                daughterRegions = null;
-            }
-        } catch (Exception e) {
-            if (st != null) {
-                LOG.error("Error while rolling back the split failure for index region", e);
-            }
-            rs.abort("Abort; we got an error during rollback of index");
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
index e032feb..b545156 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
@@ -42,6 +42,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.parse.BindParseNode;
+import org.apache.phoenix.parse.ColumnDef;
 import org.apache.phoenix.parse.ColumnParseNode;
 import org.apache.phoenix.parse.CreateTableStatement;
 import org.apache.phoenix.parse.ParseNode;
@@ -49,6 +50,7 @@ import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.query.DelegateConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PDatum;
@@ -90,6 +92,15 @@ public class CreateTableCompiler {
         String viewStatementToBe = null;
         byte[][] viewColumnConstantsToBe = null;
         BitSet isViewColumnReferencedToBe = null;
+        // Check whether column families having local index column family suffix or not if present
+        // don't allow creating table.
+        for(ColumnDef columnDef: create.getColumnDefs()) {
+            if(columnDef.getColumnDefName().getFamilyName()!=null && columnDef.getColumnDefName().getFamilyName().contains(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNALLOWED_COLUMN_FAMILY)
+                .build().buildException();
+            }
+        }
+
         if (type == PTableType.VIEW) {
             TableRef tableRef = resolver.getTables().get(0);
             int nColumns = tableRef.getTable().getColumns().size();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
index f92738c..079ff5c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
@@ -55,7 +55,7 @@ public class PostLocalIndexDDLCompiler {
         this.tableName = tableName;
     }
 
-	public MutationPlan compile(final PTable index) throws SQLException {
+	public MutationPlan compile(PTable index) throws SQLException {
 		try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
             String query = "SELECT count(*) FROM " + tableName;
             final QueryPlan plan = statement.compileQuery(query);
@@ -64,6 +64,12 @@ public class PostLocalIndexDDLCompiler {
             ImmutableBytesWritable ptr = new ImmutableBytesWritable();
             final PTable dataTable = tableRef.getTable();
             List<PTable> indexes = Lists.newArrayListWithExpectedSize(1);
+            for (PTable indexTable : dataTable.getIndexes()) {
+                if (indexTable.getKey().equals(index.getKey())) {
+                    index = indexTable;
+                    break;
+                }
+            }
             // Only build newly created index.
             indexes.add(index);
             IndexMaintainer.serialize(dataTable, ptr, indexes, plan.getContext().getConnection());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
index 8d7d7cf..99a9731 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
@@ -287,14 +287,22 @@ public class ProjectionCompiler {
             String indexColName = IndexUtil.getIndexColumnName(column);
             PColumn indexColumn = null;
             ColumnRef ref = null;
+            String indexColumnFamily = null;
             try {
                 indexColumn = index.getColumn(indexColName);
                 ref = new ColumnRef(tableRef, indexColumn.getPosition());
+                indexColumnFamily = indexColumn.getFamilyName() == null ? null : indexColumn.getFamilyName().getString();
             } catch (ColumnNotFoundException e) {
                 if (index.getIndexType() == IndexType.LOCAL) {
                     try {
                         ref = new LocalIndexDataColumnRef(context, indexColName);
                         indexColumn = ref.getColumn();
+                        indexColumnFamily =
+                                indexColumn.getFamilyName() == null ? null
+                                        : (index.getIndexType() == IndexType.LOCAL ? IndexUtil
+                                                .getLocalIndexColumnFamily(indexColumn
+                                                        .getFamilyName().getString()) : indexColumn
+                                                .getFamilyName().getString());
                     } catch (ColumnFamilyNotFoundException c) {
                         throw e;
                     }
@@ -303,7 +311,7 @@ public class ProjectionCompiler {
                 }
             }
             if (resolveColumn) {
-                ref = context.getResolver().resolveColumn(index.getTableName().getString(), indexColumn.getFamilyName() == null ? null : indexColumn.getFamilyName().getString(), indexColName);
+                ref = context.getResolver().resolveColumn(index.getTableName().getString(), indexColumnFamily, indexColName);
             }
             Expression expression = ref.newColumnExpression();
             projectedExpressions.add(expression);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index e2fc2ca..7d60cd5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -32,6 +32,11 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -50,6 +55,7 @@ import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.iterate.ResultIterator;
@@ -81,11 +87,14 @@ import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnImpl;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.TypeMismatchException;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -104,7 +113,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 public class UpsertCompiler {
-    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixStatement statement, boolean useServerTimestamp) {
+    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixStatement statement, boolean useServerTimestamp) throws SQLException {
         Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
         byte[][] pkValues = new byte[table.getPKColumns().size()][];
         // If the table uses salting, the first byte is the salting byte, set to an empty array

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index fa4343a..39ac6fe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 3b8efc3..2d7c291 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -63,7 +63,6 @@ import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.Closeables;
@@ -129,7 +128,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
         byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
         List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
         TupleProjector tupleProjector = null;
-        Region dataRegion = null;
         byte[][] viewConstants = null;
         ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
 
@@ -138,13 +136,12 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
         if (ScanUtil.isLocalIndex(scan) || (j == null && p != null)) {
             if (dataColumns != null) {
                 tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
-                dataRegion = IndexUtil.getDataRegion(c.getEnvironment());
                 viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
             }
             ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
             innerScanner =
                     getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector,
-                            dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr);
+                        c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr);
         }
 
         if (j != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 8264101..e77ff8b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -1608,7 +1608,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     }
                     results.add(result);
                 }
-                TableViewFinderResult tableViewFinderResult = new TableViewFinderResult(results);
+                TableViewFinderResult tableViewFinderResult = new TableViewFinderResult(results, table);
                 if (numOfChildViews > 0 && !allViewsInCurrentRegion) {
                     tableViewFinderResult.setAllViewsNotInSingleRegion();
                 }
@@ -3377,13 +3377,21 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
 
         private List<Result> results = Lists.newArrayList();
         private boolean allViewsNotInSingleRegion = false;
+        private PTable table;
 
-        private TableViewFinderResult(List<Result> results) {
+        private TableViewFinderResult(List<Result> results, PTable table) {
             this.results = results;
+            this.table = table;
         }
 
         public boolean hasViews() {
-            return results.size() > 0;
+            int localIndexesCount = 0;
+            for(PTable index : table.getIndexes()) {
+                if(index.getIndexType().equals(IndexType.LOCAL)) {
+                    localIndexesCount++;
+                }
+            }
+            return results.size()-localIndexesCount > 0;
         }
 
         private void setAllViewsNotInSingleRegion() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 35be54d..48e3704 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -209,7 +209,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
         ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
         if (dataColumns != null) {
             tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
-            dataRegion = IndexUtil.getDataRegion(c.getEnvironment());
+            dataRegion = c.getEnvironment().getRegion();
             byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
             List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes);
             indexMaintainer = indexMaintainers.get(0);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index f817772..d474665 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -165,6 +165,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             throws IOException {
         s = super.preScannerOpen(e, scan, s);
         if (ScanUtil.isAnalyzeTable(scan)) {
+            if (!ScanUtil.isLocalIndex(scan)) {
+                scan.getFamilyMap().clear();
+            }
             // We are setting the start row and stop row such that it covers the entire region. As part
             // of Phonenix-1263 we are storing the guideposts against the physical table rather than
             // individual tenant specific tables.
@@ -180,6 +183,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         RegionCoprocessorEnvironment env = c.getEnvironment();
         Region region = env.getRegion();
         long ts = scan.getTimeRange().getMax();
+        boolean localIndexScan = ScanUtil.isLocalIndex(scan);
         if (ScanUtil.isAnalyzeTable(scan)) {
             byte[] gp_width_bytes =
                     scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_WIDTH_BYTES);
@@ -192,7 +196,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             return collectStats(s, statsCollector, region, scan, env.getConfiguration());
         }
         int offsetToBe = 0;
-        if (ScanUtil.isLocalIndex(scan)) {
+        if (localIndexScan) {
             /*
              * For local indexes, we need to set an offset on row key expressions to skip
              * the region start key.
@@ -202,7 +206,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             ScanUtil.setRowKeyOffset(scan, offsetToBe);
         }
         final int offset = offsetToBe;
-
+        
         PTable projectedTable = null;
         PTable writeToTable = null;
         byte[][] values = null;
@@ -238,6 +242,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             projectedTable = deserializeTable(upsertSelectTable);
             selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
             values = new byte[projectedTable.getPKColumns().size()][];
+            
         } else {
             byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
             isDelete = isDeleteAgg != null && Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
@@ -248,22 +253,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF);
         }
         TupleProjector tupleProjector = null;
-        Region dataRegion = null;
         byte[][] viewConstants = null;
         ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
-        boolean localIndexScan = ScanUtil.isLocalIndex(scan);
         final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
         if ((localIndexScan && !isDelete && !isDescRowKeyOrderUpgrade) || (j == null && p != null)) {
             if (dataColumns != null) {
                 tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
-                dataRegion = IndexUtil.getDataRegion(env);
                 viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
             }
             ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
             theScanner =
                     getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector, 
-                            dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr);
+                        region, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr);
         } 
         
         if (j != null)  {
@@ -513,7 +515,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                             // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config
                             if (!indexMutations.isEmpty() && batchSize > 0 &&
                                     indexMutations.size() % batchSize == 0) {
-                                commitIndexMutations(c, region, indexMutations);
+                                commitBatch(region, indexMutations, null);
+                                indexMutations.clear();
                             }
                         } catch (ConstraintViolationException e) {
                             // Log and ignore in count
@@ -544,7 +547,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
 
         if (!indexMutations.isEmpty()) {
-            commitIndexMutations(c, region, indexMutations);
+            commitBatch(region,indexMutations, null);
+            indexMutations.clear();
         }
 
         final boolean hadAny = hasAny;
@@ -579,31 +583,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         return scanner;
     }
 
-    private void commitIndexMutations(final ObserverContext<RegionCoprocessorEnvironment> c,
-            Region region, List<Mutation> indexMutations) throws IOException {
-        // Get indexRegion corresponding to data region
-        Region indexRegion = IndexUtil.getIndexRegion(c.getEnvironment());
-        if (indexRegion != null) {
-            commitBatch(indexRegion, indexMutations, null);
-        } else {
-            TableName indexTable =
-                    TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(region.getTableDesc()
-                            .getName()));
-            HTableInterface table = null;
-            try {
-                table = c.getEnvironment().getTable(indexTable);
-                table.batch(indexMutations);
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-                ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(),
-                    ie);
-            } finally {
-                if (table != null) table.close();
-             }
-        }
-        indexMutations.clear();
-    }
-
     @Override
     public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
             InternalScanner scanner, final ScanType scanType) throws IOException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 43965f5..5a8fffa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.AmbiguousTableException;
@@ -203,6 +204,7 @@ public enum SQLExceptionCode {
     NULLABLE_FIXED_WIDTH_LAST_PK(1023, "42J04", "Cannot add column to table when the last PK column is nullable and fixed width."),
     CANNOT_MODIFY_VIEW_PK(1036, "42J04", "Cannot modify the primary key of a VIEW if last PK column of parent is variable length."),
     BASE_TABLE_COLUMN(1037, "42J04", "Cannot modify columns of base table used by tenant-specific tables."),
+    UNALLOWED_COLUMN_FAMILY(1090, "42J04", "Column family names should not contain local index column prefix: "+QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX),
     // Key/value column related errors
     KEY_VALUE_NOT_NULL(1007, "42K01", "A key/value column may not be declared as not null."),
     // View related errors.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java
index 8604784..13a3047 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java
@@ -17,20 +17,58 @@
  */
 package org.apache.phoenix.hbase.index;
 
-import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
+import java.util.List;
+
+import org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.QueryConstants;
 
 /**
- * Split policy for index regions to avoid split from external requests.
+ * Split policy for local indexed tables to select split key from non local index column families
+ * always.
  */
-public class IndexRegionSplitPolicy extends RegionSplitPolicy {
+public class IndexRegionSplitPolicy extends IncreasingToUpperBoundRegionSplitPolicy {
 
     @Override
-    protected boolean shouldSplit() {
+    protected boolean skipStoreFileRangeCheck(String familyName) {
+        if (familyName.startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+            return true;
+        }
         return false;
     }
 
-    protected boolean skipStoreFileRangeCheck() {
-        return true;
-    }
+    @Override
+    protected byte[] getSplitPoint() {
+        byte[] oldSplitPoint = super.getSplitPoint();
+        if (oldSplitPoint == null) return null;
+        List<Store> stores = region.getStores();
+        byte[] splitPointFromLargestStore = null;
+        long largestStoreSize = 0;
+        boolean isLocalIndexKey = false;
+        for (Store s : stores) {
+            if (s.getFamily().getNameAsString()
+                    .startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+                byte[] splitPoint = s.getSplitPoint();
+                if (oldSplitPoint != null && splitPoint != null
+                        && Bytes.compareTo(oldSplitPoint, splitPoint) == 0) {
+                    isLocalIndexKey = true;
+                }
+            }
+        }
+        if (!isLocalIndexKey) return oldSplitPoint;
 
+        for (Store s : stores) {
+            if (!s.getFamily().getNameAsString()
+                    .startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+                byte[] splitPoint = s.getSplitPoint();
+                long storeSize = s.getSize();
+                if (splitPoint != null && largestStoreSize < storeSize) {
+                    splitPointFromLargestStore = splitPoint;
+                    largestStoreSize = storeSize;
+                }
+            }
+        }
+        return splitPointFromLargestStore;
+    }
 }


Mime
View raw message