Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A243C200B4B for ; Thu, 16 Jun 2016 07:27:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A0E6E160A57; Thu, 16 Jun 2016 05:27:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5BB29160A5D for ; Thu, 16 Jun 2016 07:27:26 +0200 (CEST) Received: (qmail 57683 invoked by uid 500); 16 Jun 2016 05:27:25 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 57674 invoked by uid 99); 16 Jun 2016 05:27:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Jun 2016 05:27:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 63658DFBA8; Thu, 16 Jun 2016 05:27:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rajeshbabu@apache.org To: commits@phoenix.apache.org Date: Thu, 16 Jun 2016 05:27:26 -0000 Message-Id: <80a055ded10346b3903b277bc5e03344@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/5] phoenix git commit: PHOENIX-1734 Local index improvements(Rajeshbabu) archived-at: Thu, 16 Jun 2016 05:27:28 -0000 http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java deleted file mode 100644 index d835ce9..0000000 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java +++ /dev/null @@ -1,986 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_SPLIT; -import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLIT; -import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_SPLITTING; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.RegionTransition; -import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.io.Reference; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.phoenix.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.HasThread; -import org.apache.hadoop.hbase.util.PairOfSameType; -import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.apache.zookeeper.data.Stat; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -@InterfaceAudience.Private -public class IndexSplitTransaction extends SplitTransactionImpl { // FIXME: Extends private type - private static final Log LOG = LogFactory.getLog(IndexSplitTransaction.class); - - /* - * Region to split - */ - private final HRegion parent; - private HRegionInfo hri_a; - private HRegionInfo hri_b; - private long fileSplitTimeout = 30000; - private int znodeVersion = -1; - - /* - * Row to split around - */ - private final byte [] splitrow; - - /** - * Types to add to the transaction journal. - * Each enum is a step in the split transaction. Used to figure how much - * we need to rollback. - */ - enum JournalEntry { - /** - * Set region as in transition, set it into SPLITTING state. - */ - SET_SPLITTING_IN_ZK, - /** - * We created the temporary split data directory. - */ - CREATE_SPLIT_DIR, - /** - * Closed the parent region. - */ - CLOSED_PARENT_REGION, - /** - * The parent has been taken out of the server's online regions list. - */ - OFFLINED_PARENT, - /** - * Started in on creation of the first daughter region. - */ - STARTED_REGION_A_CREATION, - /** - * Started in on the creation of the second daughter region. - */ - STARTED_REGION_B_CREATION, - /** - * Point of no return. - * If we got here, then transaction is not recoverable other than by - * crashing out the regionserver. - */ - PONR - } - - /* - * Journal of how far the split transaction has progressed. - */ - private final List journal = new ArrayList(); - - /** - * Constructor - * @param r Region to split - * @param splitrow Row to split around - */ - public IndexSplitTransaction(final Region r, final byte [] splitrow) { - super(r , splitrow); - this.parent = (HRegion)r; - this.splitrow = splitrow; - } - - /** - * Does checks on split inputs. - * @return true if the region is splittable else - * false if it is not (e.g. its already closed, etc.). - */ - @Override - public boolean prepare() { - if (!this.parent.isSplittable()) return false; - // Split key can be null if this region is unsplittable; i.e. has refs. - if (this.splitrow == null) return false; - HRegionInfo hri = this.parent.getRegionInfo(); - parent.prepareToSplit(); - // Check splitrow. - byte [] startKey = hri.getStartKey(); - byte [] endKey = hri.getEndKey(); - if (Bytes.equals(startKey, splitrow) || - !this.parent.getRegionInfo().containsRow(splitrow)) { - LOG.info("Split row is not inside region key range or is equal to " + - "startkey: " + Bytes.toStringBinary(this.splitrow)); - return false; - } - long rid = getDaughterRegionIdTimestamp(hri); - this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid); - this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid); - return true; - } - - /** - * Calculate daughter regionid to use. - * @param hri Parent {@link HRegionInfo} - * @return Daughter region id (timestamp) to use. - */ - private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) { - long rid = EnvironmentEdgeManager.currentTimeMillis(); - // Regionid is timestamp. Can't be less than that of parent else will insert - // at wrong location in hbase:meta (See HBASE-710). - if (rid < hri.getRegionId()) { - LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() + - " but current time here is " + rid); - rid = hri.getRegionId() + 1; - } - return rid; - } - - private static IOException closedByOtherException = new IOException( - "Failed to close region: already closed by another thread"); - - /** - * Prepare the regions and region files. - * @param server Hosting server instance. Can be null when testing (won't try - * and update in zk if a null server) - * @param services Used to online/offline regions. - * @throws IOException If thrown, transaction failed. - * Call {@link #rollback(Server, RegionServerServices)} - * @return Regions created - */ - @Override - /* package */PairOfSameType createDaughters(final Server server, - final RegionServerServices services) throws IOException { - LOG.info("Starting split of region " + this.parent); - if ((server != null && server.isStopped()) || - (services != null && services.isStopping())) { - throw new IOException("Server is stopped or stopping"); - } - assert !this.parent.lock.writeLock().isHeldByCurrentThread(): - "Unsafe to hold write lock while performing RPCs"; - - // Coprocessor callback - if (this.parent.getCoprocessorHost() != null) { - this.parent.getCoprocessorHost().preSplit(); - } - - // Coprocessor callback - if (this.parent.getCoprocessorHost() != null) { - this.parent.getCoprocessorHost().preSplit(this.splitrow); - } - - // If true, no cluster to write meta edits to or to update znodes in. - boolean testing = server == null? true: - server.getConfiguration().getBoolean("hbase.testing.nocluster", false); - this.fileSplitTimeout = testing ? this.fileSplitTimeout : - server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout", - this.fileSplitTimeout); - - PairOfSameType daughterRegions = stepsBeforePONR(server, services, testing); - - List metaEntries = new ArrayList(); - if (this.parent.getCoprocessorHost() != null) { - if (this.parent.getCoprocessorHost(). - preSplitBeforePONR(this.splitrow, metaEntries)) { - throw new IOException("Coprocessor bypassing region " - + this.parent.getRegionInfo().getRegionNameAsString() + " split."); - } - try { - for (Mutation p : metaEntries) { - HRegionInfo.parseRegionName(p.getRow()); - } - } catch (IOException e) { - LOG.error("Row key of mutation from coprossor is not parsable as region name." - + "Mutations from coprocessor should only for hbase:meta table."); - throw e; - } - } - - // This is the point of no return. Adding subsequent edits to .META. as we - // do below when we do the daughter opens adding each to .META. can fail in - // various interesting ways the most interesting of which is a timeout - // BUT the edits all go through (See HBASE-3872). IF we reach the PONR - // then subsequent failures need to crash out this regionserver; the - // server shutdown processing should be able to fix-up the incomplete split. - // The offlined parent will have the daughters as extra columns. If - // we leave the daughter regions in place and do not remove them when we - // crash out, then they will have their references to the parent in place - // still and the server shutdown fixup of .META. will point to these - // regions. - // We should add PONR JournalEntry before offlineParentInMeta,so even if - // OfflineParentInMeta timeout,this will cause regionserver exit,and then - // master ServerShutdownHandler will fix daughter & avoid data loss. (See - // HBase-4562). - this.journal.add(JournalEntry.PONR); - - // Edit parent in meta. Offlines parent region and adds splita and splitb - // as an atomic update. See HBASE-7721. This update to META makes the region - // will determine whether the region is split or not in case of failures. - // If it is successful, master will roll-forward, if not, master will rollback - // and assign the parent region. - if (!testing) { - if (metaEntries == null || metaEntries.isEmpty()) { - MetaTableAccessor.splitRegion(server.getConnection(), parent.getRegionInfo(), - daughterRegions.getFirst().getRegionInfo(), - daughterRegions.getSecond().getRegionInfo(), server.getServerName(), - parent.getTableDesc().getRegionReplication()); - } else { - offlineParentInMetaAndputMetaEntries(server.getConnection(), - parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions - .getSecond().getRegionInfo(), server.getServerName(), metaEntries, - parent.getTableDesc().getRegionReplication()); - } - } - return daughterRegions; - } - - @Override - public PairOfSameType stepsBeforePONR(final Server server, - final RegionServerServices services, boolean testing) throws IOException { - // Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't - // have zookeeper so don't do zk stuff if server or zookeeper is null - if (server != null && server.getZooKeeper() != null) { - try { - createNodeSplitting(server.getZooKeeper(), - parent.getRegionInfo(), server.getServerName(), hri_a, hri_b); - } catch (KeeperException e) { - throw new IOException("Failed creating PENDING_SPLIT znode on " + - this.parent.getRegionInfo().getRegionNameAsString(), e); - } - } - this.journal.add(JournalEntry.SET_SPLITTING_IN_ZK); - if (server != null && server.getZooKeeper() != null) { - // After creating the split node, wait for master to transition it - // from PENDING_SPLIT to SPLITTING so that we can move on. We want master - // knows about it and won't transition any region which is splitting. - znodeVersion = getZKNode(server, services); - } - - this.parent.getRegionFileSystem().createSplitsDir(); - this.journal.add(JournalEntry.CREATE_SPLIT_DIR); - - Map> hstoreFilesToSplit = null; - Exception exceptionToThrow = null; - try{ - hstoreFilesToSplit = this.parent.close(false); - } catch (Exception e) { - exceptionToThrow = e; - } - if (exceptionToThrow == null && hstoreFilesToSplit == null) { - // The region was closed by a concurrent thread. We can't continue - // with the split, instead we must just abandon the split. If we - // reopen or split this could cause problems because the region has - // probably already been moved to a different server, or is in the - // process of moving to a different server. - exceptionToThrow = closedByOtherException; - } - if (exceptionToThrow != closedByOtherException) { - this.journal.add(JournalEntry.CLOSED_PARENT_REGION); - } - if (exceptionToThrow != null) { - if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow; - throw new IOException(exceptionToThrow); - } - if (!testing) { - services.removeFromOnlineRegions(this.parent, null); - } - this.journal.add(JournalEntry.OFFLINED_PARENT); - - // TODO: If splitStoreFiles were multithreaded would we complete steps in - // less elapsed time? St.Ack 20100920 - // - // splitStoreFiles creates daughter region dirs under the parent splits dir - // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will - // clean this up. - splitStoreFiles(hstoreFilesToSplit); - - // Log to the journal that we are creating region A, the first daughter - // region. We could fail halfway through. If we do, we could have left - // stuff in fs that needs cleanup -- a storefile or two. Thats why we - // add entry to journal BEFORE rather than AFTER the change. - this.journal.add(JournalEntry.STARTED_REGION_A_CREATION); - Region a = this.parent.createDaughterRegionFromSplits(this.hri_a); - - // Ditto - this.journal.add(JournalEntry.STARTED_REGION_B_CREATION); - Region b = this.parent.createDaughterRegionFromSplits(this.hri_b); - return new PairOfSameType(a, b); - } - - /** - * Perform time consuming opening of the daughter regions. - * @param server Hosting server instance. Can be null when testing (won't try - * and update in zk if a null server) - * @param services Used to online/offline regions. - * @param a first daughter region - * @param a second daughter region - * @throws IOException If thrown, transaction failed. - * Call {@link #rollback(Server, RegionServerServices)} - */ - @Override - /* package */void openDaughters(final Server server, - final RegionServerServices services, Region a, Region b) - throws IOException { - boolean stopped = server != null && server.isStopped(); - boolean stopping = services != null && services.isStopping(); - // TODO: Is this check needed here? - if (stopped || stopping) { - LOG.info("Not opening daughters " + - b.getRegionInfo().getRegionNameAsString() + - " and " + - a.getRegionInfo().getRegionNameAsString() + - " because stopping=" + stopping + ", stopped=" + stopped); - } else { - // Open daughters in parallel. - DaughterOpener aOpener = new DaughterOpener(server, (HRegion)a); - DaughterOpener bOpener = new DaughterOpener(server, (HRegion)b); - aOpener.start(); - bOpener.start(); - try { - aOpener.join(); - bOpener.join(); - } catch (InterruptedException e) { - throw (InterruptedIOException)new InterruptedIOException().initCause(e); - } - if (aOpener.getException() != null) { - throw new IOException("Failed " + - aOpener.getName(), aOpener.getException()); - } - if (bOpener.getException() != null) { - throw new IOException("Failed " + - bOpener.getName(), bOpener.getException()); - } - if (services != null) { - try { - // add 2nd daughter first (see HBASE-4335) - services.postOpenDeployTasks(b); - // Should add it to OnlineRegions - services.addToOnlineRegions(b); - services.postOpenDeployTasks(a); - services.addToOnlineRegions(a); - } catch (KeeperException ke) { - throw new IOException(ke); - } - } - } - } - - /** - * Finish off split transaction, transition the zknode - * @param server Hosting server instance. Can be null when testing (won't try - * and update in zk if a null server) - * @param services Used to online/offline regions. - * @param a first daughter region - * @param a second daughter region - * @throws IOException If thrown, transaction failed. - * Call {@link #rollback(Server, RegionServerServices)} - */ - /* package */void transitionZKNode(final Server server, - final RegionServerServices services, Region a, Region b) - throws IOException { - // Tell master about split by updating zk. If we fail, abort. - if (server != null && server.getZooKeeper() != null) { - try { - this.znodeVersion = transitionSplittingNode(server.getZooKeeper(), - parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(), - server.getServerName(), this.znodeVersion, - RS_ZK_REGION_SPLITTING, RS_ZK_REGION_SPLIT); - - int spins = 0; - // Now wait for the master to process the split. We know it's done - // when the znode is deleted. The reason we keep tickling the znode is - // that it's possible for the master to miss an event. - do { - if (spins % 10 == 0) { - LOG.debug("Still waiting on the master to process the split for " + - this.parent.getRegionInfo().getEncodedName()); - } - Thread.sleep(100); - // When this returns -1 it means the znode doesn't exist - this.znodeVersion = transitionSplittingNode(server.getZooKeeper(), - parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo(), - server.getServerName(), this.znodeVersion, - RS_ZK_REGION_SPLIT, RS_ZK_REGION_SPLIT); - spins++; - } while (this.znodeVersion != -1 && !server.isStopped() - && !services.isStopping()); - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new IOException("Failed telling master about split", e); - } - } - - // Coprocessor callback - if (this.parent.getCoprocessorHost() != null) { - this.parent.getCoprocessorHost().postSplit(a,b); - } - - // Leaving here, the splitdir with its dross will be in place but since the - // split was successful, just leave it; it'll be cleaned when parent is - // deleted and cleaned up. - } - - /** - * Wait for the splitting node to be transitioned from pending_split - * to splitting by master. That's how we are sure master has processed - * the event and is good with us to move on. If we don't get any update, - * we periodically transition the node so that master gets the callback. - * If the node is removed or is not in pending_split state any more, - * we abort the split. - */ - private int getZKNode(final Server server, - final RegionServerServices services) throws IOException { - // Wait for the master to process the pending_split. - try { - int spins = 0; - Stat stat = new Stat(); - ZooKeeperWatcher zkw = server.getZooKeeper(); - ServerName expectedServer = server.getServerName(); - String node = parent.getRegionInfo().getEncodedName(); - while (!(server.isStopped() || services.isStopping())) { - if (spins % 5 == 0) { - LOG.debug("Still waiting for master to process " - + "the pending_split for " + node); - transitionSplittingNode(zkw, parent.getRegionInfo(), - hri_a, hri_b, expectedServer, -1, RS_ZK_REQUEST_REGION_SPLIT, - RS_ZK_REQUEST_REGION_SPLIT); - } - Thread.sleep(100); - spins++; - byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat); - if (data == null) { - throw new IOException("Data is null, splitting node " - + node + " no longer exists"); - } - RegionTransition rt = RegionTransition.parseFrom(data); - EventType et = rt.getEventType(); - if (et == RS_ZK_REGION_SPLITTING) { - ServerName serverName = rt.getServerName(); - if (!serverName.equals(expectedServer)) { - throw new IOException("Splitting node " + node + " is for " - + serverName + ", not us " + expectedServer); - } - byte [] payloadOfSplitting = rt.getPayload(); - List splittingRegions = HRegionInfo.parseDelimitedFrom( - payloadOfSplitting, 0, payloadOfSplitting.length); - assert splittingRegions.size() == 2; - HRegionInfo a = splittingRegions.get(0); - HRegionInfo b = splittingRegions.get(1); - if (!(hri_a.equals(a) && hri_b.equals(b))) { - throw new IOException("Splitting node " + node + " is for " + a + ", " - + b + ", not expected daughters: " + hri_a + ", " + hri_b); - } - // Master has processed it. - return stat.getVersion(); - } - if (et != RS_ZK_REQUEST_REGION_SPLIT) { - throw new IOException("Splitting node " + node - + " moved out of splitting to " + et); - } - } - // Server is stopping/stopped - throw new IOException("Server is " - + (services.isStopping() ? "stopping" : "stopped")); - } catch (Exception e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new IOException("Failed getting SPLITTING znode on " - + parent.getRegionInfo().getRegionNameAsString(), e); - } - } - - /** - * Run the transaction. - * @param server Hosting server instance. Can be null when testing (won't try - * and update in zk if a null server) - * @param services Used to online/offline regions. - * @throws IOException If thrown, transaction failed. - * Call {@link #rollback(Server, RegionServerServices)} - * @return Regions created - * @throws IOException - * @see #rollback(Server, RegionServerServices) - */ - @Override - public PairOfSameType execute(final Server server, - final RegionServerServices services) - throws IOException { - PairOfSameType regions = createDaughters(server, services); - if (this.parent.getCoprocessorHost() != null) { - this.parent.getCoprocessorHost().preSplitAfterPONR(); - } - return stepsAfterPONR(server, services, regions); - } - - @Override - public PairOfSameType stepsAfterPONR(final Server server, - final RegionServerServices services, PairOfSameType regions) - throws IOException { - openDaughters(server, services, regions.getFirst(), regions.getSecond()); - transitionZKNode(server, services, regions.getFirst(), regions.getSecond()); - return regions; - } - - private void offlineParentInMetaAndputMetaEntries(Connection conn, - HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB, - ServerName serverName, List metaEntries, int regionReplication) throws IOException { - List mutations = metaEntries; - HRegionInfo copyOfParent = new HRegionInfo(parent); - copyOfParent.setOffline(true); - copyOfParent.setSplit(true); - - //Put for parent - Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent); - MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB); - mutations.add(putParent); - - //Puts for daughters - Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA); - Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB); - - addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine. - addLocation(putB, serverName, 1); - mutations.add(putA); - mutations.add(putB); - - // Add empty locations for region replicas of daughters so that number of replicas can be - // cached whenever the primary region is looked up from meta - for (int i = 1; i < regionReplication; i++) { - addEmptyLocation(putA, i); - addEmptyLocation(putB, i); - } - - MetaTableAccessor.mutateMetaTable(conn, mutations); - } - - @Override - public Put addLocation(final Put p, final ServerName sn, long openSeqNum) { - p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, - Bytes.toBytes(sn.getHostAndPort())); - p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, - Bytes.toBytes(sn.getStartcode())); - p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, - Bytes.toBytes(openSeqNum)); - return p; - } - - private static Put addEmptyLocation(final Put p, int replicaId){ - p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(replicaId), null); - p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getStartCodeColumn(replicaId), null); - p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getSeqNumColumn(replicaId), null); - return p; - } - - /* - * Open daughter region in its own thread. - * If we fail, abort this hosting server. - */ - class DaughterOpener extends HasThread { - private final Server server; - private final HRegion r; - private Throwable t = null; - - DaughterOpener(final Server s, final HRegion r) { - super((s == null? "null-services": s.getServerName()) + - "-daughterOpener=" + r.getRegionInfo().getEncodedName()); - setDaemon(true); - this.server = s; - this.r = r; - } - - /** - * @return Null if open succeeded else exception that causes us fail open. - * Call it after this thread exits else you may get wrong view on result. - */ - Throwable getException() { - return this.t; - } - - @Override - public void run() { - try { - openDaughterRegion(this.server, r); - } catch (Throwable t) { - this.t = t; - } - } - } - - /** - * Open daughter regions, add them to online list and update meta. - * @param server - * @param daughter - * @throws IOException - * @throws KeeperException - */ - @Override - void openDaughterRegion(final Server server, final HRegion daughter) - throws IOException, KeeperException { - HRegionInfo hri = daughter.getRegionInfo(); - LoggingProgressable reporter = server == null ? null - : new LoggingProgressable(hri, server.getConfiguration().getLong( - "hbase.regionserver.split.daughter.open.log.interval", 10000)); - daughter.openHRegion(reporter); - } - - static class LoggingProgressable implements CancelableProgressable { - private final HRegionInfo hri; - private long lastLog = -1; - private final long interval; - - LoggingProgressable(final HRegionInfo hri, final long interval) { - this.hri = hri; - this.interval = interval; - } - - @Override - public boolean progress() { - long now = System.currentTimeMillis(); - if (now - lastLog > this.interval) { - LOG.info("Opening " + this.hri.getRegionNameAsString()); - this.lastLog = now; - } - return true; - } - } - - private void splitStoreFiles(final Map> hstoreFilesToSplit) - throws IOException { - if (hstoreFilesToSplit == null) { - // Could be null because close didn't succeed -- for now consider it fatal - throw new IOException("Close returned empty list of StoreFiles"); - } - // The following code sets up a thread pool executor with as many slots as - // there's files to split. It then fires up everything, waits for - // completion and finally checks for any exception - int nbFiles = hstoreFilesToSplit.size(); - if (nbFiles == 0) { - // no file needs to be splitted. - return; - } - ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); - builder.setNameFormat("StoreFileSplitter-%1$d"); - ThreadFactory factory = builder.build(); - ThreadPoolExecutor threadPool = - (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory); - List> futures = new ArrayList>(nbFiles); - - // Split each store file. - for (Map.Entry> entry: hstoreFilesToSplit.entrySet()) { - for (StoreFile sf: entry.getValue()) { - StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf); - futures.add(threadPool.submit(sfs)); - } - } - // Shutdown the pool - threadPool.shutdown(); - - // Wait for all the tasks to finish - try { - boolean stillRunning = !threadPool.awaitTermination( - this.fileSplitTimeout, TimeUnit.MILLISECONDS); - if (stillRunning) { - threadPool.shutdownNow(); - // wait for the thread to shutdown completely. - while (!threadPool.isTerminated()) { - Thread.sleep(50); - } - throw new IOException("Took too long to split the" + - " files and create the references, aborting split"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw (InterruptedIOException)new InterruptedIOException().initCause(e); - } - - // Look for any exception - for (Future future: futures) { - try { - future.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw (InterruptedIOException)new InterruptedIOException().initCause(e); - } catch (ExecutionException e) { - throw new IOException(e); - } - } - } - - /** - * Utility class used to do the file splitting / reference writing - * in parallel instead of sequentially. - */ - class StoreFileSplitter implements Callable { - private final byte[] family; - private final StoreFile sf; - - /** - * Constructor that takes what it needs to split - * @param family Family that contains the store file - * @param sf which file - */ - public StoreFileSplitter(final byte[] family, final StoreFile sf) { - this.sf = sf; - this.family = family; - } - - @Override - public Void call() throws IOException { - splitStoreFile(family, sf); - return null; - } - } - - private void splitStoreFile(final byte[] family, final StoreFile sf) throws IOException { - HRegionFileSystem fs = this.parent.getRegionFileSystem(); - String familyName = Bytes.toString(family); - splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false, fs); - splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true, fs); - } - - private Path splitStoreFile(HRegionInfo hri, String familyName, StoreFile f, byte[] splitRow, - boolean top, HRegionFileSystem fs) throws IOException { - f.closeReader(true); - Path splitDir = - new Path(fs.getSplitsDir(hri), familyName); - // A reference to the bottom half of the hsf store file. - Reference r = - top ? Reference.createTopReference(splitRow) : Reference - .createBottomReference(splitRow); - // Add the referred-to regions name as a dot separated suffix. - // See REF_NAME_REGEX regex above. The referred-to regions name is - // up in the path of the passed in f -- parentdir is family, - // then the directory above is the region name. - String parentRegionName = this.parent.getRegionInfo().getEncodedName(); - // Write reference with same file id only with the other region name as - // suffix and into the new region location (under same family). - Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName); - return r.write(fs.getFileSystem(), p); - } - - /** - * @param server Hosting server instance (May be null when testing). - * @param services - * @throws IOException If thrown, rollback failed. Take drastic action. - * @return True if we successfully rolled back, false if we got to the point - * of no return and so now need to abort the server to minimize damage. - */ - @Override - @SuppressWarnings("deprecation") - public boolean rollback(final Server server, final RegionServerServices services) - throws IOException { - // Coprocessor callback - if (this.parent.getCoprocessorHost() != null) { - this.parent.getCoprocessorHost().preRollBackSplit(); - } - - boolean result = true; - ListIterator iterator = - this.journal.listIterator(this.journal.size()); - // Iterate in reverse. - while (iterator.hasPrevious()) { - JournalEntry je = iterator.previous(); - switch(je) { - - case SET_SPLITTING_IN_ZK: - if (server != null && server.getZooKeeper() != null) { - cleanZK(server, this.parent.getRegionInfo()); - } - break; - - case CREATE_SPLIT_DIR: - this.parent.writestate.writesEnabled = true; - this.parent.getRegionFileSystem().cleanupSplitsDir(); - break; - - case CLOSED_PARENT_REGION: - try { - // So, this returns a seqid but if we just closed and then reopened, we - // should be ok. On close, we flushed using sequenceid obtained from - // hosting regionserver so no need to propagate the sequenceid returned - // out of initialize below up into regionserver as we normally do. - // TODO: Verify. - this.parent.initialize(); - } catch (IOException e) { - LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " + - this.parent.getRegionInfo().getRegionNameAsString(), e); - throw new RuntimeException(e); - } - break; - - case STARTED_REGION_A_CREATION: - this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a); - break; - - case STARTED_REGION_B_CREATION: - this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b); - break; - - case OFFLINED_PARENT: - if (services != null) services.addToOnlineRegions(this.parent); - break; - - case PONR: - // We got to the point-of-no-return so we need to just abort. Return - // immediately. Do not clean up created daughter regions. They need - // to be in place so we don't delete the parent region mistakenly. - // See HBASE-3872. - return false; - - default: - throw new RuntimeException("Unhandled journal entry: " + je); - } - } - // Coprocessor callback - if (this.parent.getCoprocessorHost() != null) { - this.parent.getCoprocessorHost().postRollBackSplit(); - } - return result; - } - - @Override - HRegionInfo getFirstDaughter() { - return hri_a; - } - - @Override - HRegionInfo getSecondDaughter() { - return hri_b; - } - - private static void cleanZK(final Server server, final HRegionInfo hri) { - try { - // Only delete if its in expected state; could have been hijacked. - if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(), - RS_ZK_REQUEST_REGION_SPLIT, server.getServerName())) { - ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(), - RS_ZK_REGION_SPLITTING, server.getServerName()); - } - } catch (KeeperException.NoNodeException e) { - LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e); - } catch (KeeperException e) { - server.abort("Failed cleanup of " + hri.getRegionNameAsString(), e); - } - } - - /** - * Creates a new ephemeral node in the PENDING_SPLIT state for the specified region. - * Create it ephemeral in case regionserver dies mid-split. - * - *

Does not transition nodes from other states. If a node already exists - * for this region, a {@link NodeExistsException} will be thrown. - * - * @param zkw zk reference - * @param region region to be created as offline - * @param serverName server event originates from - * @throws KeeperException - * @throws IOException - */ - public static void createNodeSplitting(final ZooKeeperWatcher zkw, final HRegionInfo region, - final ServerName serverName, final HRegionInfo a, - final HRegionInfo b) throws KeeperException, IOException { - LOG.debug(zkw.prefix("Creating ephemeral node for " + - region.getEncodedName() + " in PENDING_SPLIT state")); - byte [] payload = HRegionInfo.toDelimitedByteArray(a, b); - RegionTransition rt = RegionTransition.createRegionTransition( - RS_ZK_REQUEST_REGION_SPLIT, region.getRegionName(), serverName, payload); - String node = ZKAssign.getNodeName(zkw, region.getEncodedName()); - if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) { - throw new IOException("Failed create of ephemeral " + node); - } - } - - /** - * Transitions an existing ephemeral node for the specified region which is - * currently in the begin state to be in the end state. Master cleans up the - * final SPLIT znode when it reads it (or if we crash, zk will clean it up). - * - *

Does not transition nodes from other states. If for some reason the - * node could not be transitioned, the method returns -1. If the transition - * is successful, the version of the node after transition is returned. - * - *

This method can fail and return false for three different reasons: - *

  • Node for this region does not exist
  • - *
  • Node for this region is not in the begin state
  • - *
  • After verifying the begin state, update fails because of wrong version - * (this should never actually happen since an RS only does this transition - * following a transition to the begin state. If two RS are conflicting, one would - * fail the original transition to the begin state and not this transition)
  • - *
- * - *

Does not set any watches. - * - *

This method should only be used by a RegionServer when splitting a region. - * - * @param zkw zk reference - * @param parent region to be transitioned to opened - * @param a Daughter a of split - * @param b Daughter b of split - * @param serverName server event originates from - * @param znodeVersion expected version of data before modification - * @param beginState the expected current state the znode should be - * @param endState the state to be transition to - * @return version of node after transition, -1 if unsuccessful transition - * @throws KeeperException if unexpected zookeeper exception - * @throws IOException - */ - public static int transitionSplittingNode(ZooKeeperWatcher zkw, - HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName, - final int znodeVersion, final EventType beginState, - final EventType endState) throws KeeperException, IOException { - byte [] payload = HRegionInfo.toDelimitedByteArray(a, b); - return ZKAssign.transitionNode(zkw, parent, serverName, - beginState, endState, znodeVersion, payload); - } - - public HRegion getParent() { - return this.parent; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java deleted file mode 100644 index e361343..0000000 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.coprocessor.BaseRegionServerObserver; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; -import org.apache.phoenix.schema.types.PBoolean; -import org.apache.phoenix.util.IndexUtil; -import org.apache.phoenix.util.MetaDataUtil; -import org.apache.phoenix.util.SchemaUtil; - -public class LocalIndexMerger extends BaseRegionServerObserver { - - private static final Log LOG = LogFactory.getLog(LocalIndexMerger.class); - - private RegionMergeTransactionImpl rmt = null; // FIXME: Use of private type - private HRegion mergedRegion = null; // FIXME: Use of private type - - @Override - public void preMergeCommit(ObserverContext ctx, - Region regionA, Region regionB, List metaEntries) throws IOException { - HTableDescriptor tableDesc = regionA.getTableDesc(); - if (SchemaUtil.isSystemTable(tableDesc.getName())) { - return; - } - RegionServerServices rss = ctx.getEnvironment().getRegionServerServices(); - HRegionServer rs = (HRegionServer) rss; - if (tableDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) == null - || !Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(tableDesc - .getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) { - TableName indexTable = - TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(tableDesc.getName())); - if (!MetaTableAccessor.tableExists(rs.getConnection(), indexTable)) return; - Region indexRegionA = IndexUtil.getIndexRegion(regionA, ctx.getEnvironment()); - if (indexRegionA == null) { - LOG.warn("Index region corresponindg to data region " + regionA - + " not in the same server. So skipping the merge."); - ctx.bypass(); - return; - } - Region indexRegionB = IndexUtil.getIndexRegion(regionB, ctx.getEnvironment()); - if (indexRegionB == null) { - LOG.warn("Index region corresponindg to region " + regionB - + " not in the same server. So skipping the merge."); - ctx.bypass(); - return; - } - try { - rmt = new RegionMergeTransactionImpl(indexRegionA, indexRegionB, false); - if (!rmt.prepare(rss)) { - LOG.error("Prepare for the index regions merge [" + indexRegionA + "," - + indexRegionB + "] failed. So returning null. "); - ctx.bypass(); - return; - } - this.mergedRegion = rmt.stepsBeforePONR(rss, rss, false); - rmt.prepareMutationsForMerge(mergedRegion.getRegionInfo(), - indexRegionA.getRegionInfo(), indexRegionB.getRegionInfo(), - rss.getServerName(), metaEntries, - mergedRegion.getTableDesc().getRegionReplication()); - } catch (Exception e) { - ctx.bypass(); - LOG.warn("index regions merge failed with the exception ", e); - if (rmt != null) { - rmt.rollback(rss, rss); - rmt = null; - mergedRegion = null; - } - } - } - } - - @Override - public void postMergeCommit(ObserverContext ctx, - Region regionA, Region regionB, Region mergedRegion) throws IOException { - if (rmt != null && this.mergedRegion != null) { - RegionServerCoprocessorEnvironment environment = ctx.getEnvironment(); - HRegionServer rs = (HRegionServer) environment.getRegionServerServices(); - rmt.stepsAfterPONR(rs, rs, this.mergedRegion); - } - } - - @Override - public void preRollBackMerge(ObserverContext ctx, - Region regionA, Region regionB) throws IOException { - HRegionServer rs = (HRegionServer) ctx.getEnvironment().getRegionServerServices(); - try { - if (rmt != null) { - rmt.rollback(rs, rs); - rmt = null; - mergedRegion = null; - } - } catch (Exception e) { - LOG.error("Error while rolling back the merge failure for index regions", e); - rs.abort("Abort; we got an error during rollback of index"); - } - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java index ba158a8..c60058c 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexSplitter.java @@ -17,145 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.PairOfSameType; -import org.apache.phoenix.hbase.index.util.VersionUtil; -import org.apache.phoenix.parse.ParseNodeFactory; -import org.apache.phoenix.schema.types.PBoolean; -import org.apache.phoenix.util.IndexUtil; -import org.apache.phoenix.util.MetaDataUtil; -import org.apache.phoenix.util.SchemaUtil; - -import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import java.util.List; public class LocalIndexSplitter extends BaseRegionObserver { - - private static final Log LOG = LogFactory.getLog(LocalIndexSplitter.class); - - private SplitTransactionImpl st = null; // FIXME: Uses private type - private PairOfSameType daughterRegions = null; - private static final ParseNodeFactory FACTORY = new ParseNodeFactory(); - private static final int SPLIT_TXN_MINIMUM_SUPPORTED_VERSION = VersionUtil - .encodeVersion("0.98.9"); - - @Override - public void preSplitBeforePONR(ObserverContext ctx, - byte[] splitKey, List metaEntries) throws IOException { - RegionCoprocessorEnvironment environment = ctx.getEnvironment(); - HTableDescriptor tableDesc = ctx.getEnvironment().getRegion().getTableDesc(); - if (SchemaUtil.isSystemTable(tableDesc.getName())) { - return; - } - final RegionServerServices rss = ctx.getEnvironment().getRegionServerServices(); - if (tableDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) == null - || !Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(tableDesc - .getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) { - TableName indexTable = - TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(tableDesc.getName())); - if (!MetaTableAccessor.tableExists(rss.getConnection(), indexTable)) return; - - Region indexRegion = IndexUtil.getIndexRegion(environment); - if (indexRegion == null) { - LOG.warn("Index region corresponindg to data region " + environment.getRegion() - + " not in the same server. So skipping the split."); - ctx.bypass(); - return; - } - // FIXME: Uses private type - try { - int encodedVersion = VersionUtil.encodeVersion(environment.getHBaseVersion()); - if(encodedVersion >= SPLIT_TXN_MINIMUM_SUPPORTED_VERSION) { - st = new SplitTransactionImpl(indexRegion, splitKey); - st.useZKForAssignment = - environment.getConfiguration().getBoolean("hbase.assignment.usezk", - true); - } else { - st = new IndexSplitTransaction(indexRegion, splitKey); - } - - if (!st.prepare()) { - LOG.error("Prepare for the table " + indexRegion.getTableDesc().getNameAsString() - + " failed. So returning null. "); - ctx.bypass(); - return; - } - ((HRegion)indexRegion).forceSplit(splitKey); - User.runAsLoginUser(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - daughterRegions = st.stepsBeforePONR(rss, rss, false); - return null; - } - }); - HRegionInfo copyOfParent = new HRegionInfo(indexRegion.getRegionInfo()); - copyOfParent.setOffline(true); - copyOfParent.setSplit(true); - // Put for parent - Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent); - MetaTableAccessor.addDaughtersToPut(putParent, - daughterRegions.getFirst().getRegionInfo(), - daughterRegions.getSecond().getRegionInfo()); - metaEntries.add(putParent); - // Puts for daughters - Put putA = MetaTableAccessor.makePutFromRegionInfo( - daughterRegions.getFirst().getRegionInfo()); - Put putB = MetaTableAccessor.makePutFromRegionInfo( - daughterRegions.getSecond().getRegionInfo()); - st.addLocation(putA, rss.getServerName(), 1); - st.addLocation(putB, rss.getServerName(), 1); - metaEntries.add(putA); - metaEntries.add(putB); - } catch (Exception e) { - ctx.bypass(); - LOG.warn("index region splitting failed with the exception ", e); - if (st != null){ - st.rollback(rss, rss); - st = null; - daughterRegions = null; - } - } - } - } - - @Override - public void preSplitAfterPONR(ObserverContext ctx) - throws IOException { - if (st == null || daughterRegions == null) return; - RegionCoprocessorEnvironment environment = ctx.getEnvironment(); - HRegionServer rs = (HRegionServer) environment.getRegionServerServices(); - st.stepsAfterPONR(rs, rs, daughterRegions); - } - @Override - public void preRollBackSplit(ObserverContext ctx) - throws IOException { - RegionCoprocessorEnvironment environment = ctx.getEnvironment(); - HRegionServer rs = (HRegionServer) environment.getRegionServerServices(); - try { - if (st != null) { - st.rollback(rs, rs); - st = null; - daughterRegions = null; - } - } catch (Exception e) { - if (st != null) { - LOG.error("Error while rolling back the split failure for index region", e); - } - rs.abort("Abort; we got an error during rollback of index"); - } - } - } http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java index e032feb..b545156 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java @@ -42,6 +42,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.parse.BindParseNode; +import org.apache.phoenix.parse.ColumnDef; import org.apache.phoenix.parse.ColumnParseNode; import org.apache.phoenix.parse.CreateTableStatement; import org.apache.phoenix.parse.ParseNode; @@ -49,6 +50,7 @@ import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.parse.TableName; import org.apache.phoenix.query.DelegateConnectionQueryServices; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PDatum; @@ -90,6 +92,15 @@ public class CreateTableCompiler { String viewStatementToBe = null; byte[][] viewColumnConstantsToBe = null; BitSet isViewColumnReferencedToBe = null; + // Check whether column families having local index column family suffix or not if present + // don't allow creating table. + for(ColumnDef columnDef: create.getColumnDefs()) { + if(columnDef.getColumnDefName().getFamilyName()!=null && columnDef.getColumnDefName().getFamilyName().contains(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNALLOWED_COLUMN_FAMILY) + .build().buildException(); + } + } + if (type == PTableType.VIEW) { TableRef tableRef = resolver.getTables().get(0); int nColumns = tableRef.getTable().getColumns().size(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java index f92738c..079ff5c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java @@ -55,7 +55,7 @@ public class PostLocalIndexDDLCompiler { this.tableName = tableName; } - public MutationPlan compile(final PTable index) throws SQLException { + public MutationPlan compile(PTable index) throws SQLException { try (final PhoenixStatement statement = new PhoenixStatement(connection)) { String query = "SELECT count(*) FROM " + tableName; final QueryPlan plan = statement.compileQuery(query); @@ -64,6 +64,12 @@ public class PostLocalIndexDDLCompiler { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); final PTable dataTable = tableRef.getTable(); List indexes = Lists.newArrayListWithExpectedSize(1); + for (PTable indexTable : dataTable.getIndexes()) { + if (indexTable.getKey().equals(index.getKey())) { + index = indexTable; + break; + } + } // Only build newly created index. indexes.add(index); IndexMaintainer.serialize(dataTable, ptr, indexes, plan.getContext().getConnection()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java index 8d7d7cf..99a9731 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java @@ -287,14 +287,22 @@ public class ProjectionCompiler { String indexColName = IndexUtil.getIndexColumnName(column); PColumn indexColumn = null; ColumnRef ref = null; + String indexColumnFamily = null; try { indexColumn = index.getColumn(indexColName); ref = new ColumnRef(tableRef, indexColumn.getPosition()); + indexColumnFamily = indexColumn.getFamilyName() == null ? null : indexColumn.getFamilyName().getString(); } catch (ColumnNotFoundException e) { if (index.getIndexType() == IndexType.LOCAL) { try { ref = new LocalIndexDataColumnRef(context, indexColName); indexColumn = ref.getColumn(); + indexColumnFamily = + indexColumn.getFamilyName() == null ? null + : (index.getIndexType() == IndexType.LOCAL ? IndexUtil + .getLocalIndexColumnFamily(indexColumn + .getFamilyName().getString()) : indexColumn + .getFamilyName().getString()); } catch (ColumnFamilyNotFoundException c) { throw e; } @@ -303,7 +311,7 @@ public class ProjectionCompiler { } } if (resolveColumn) { - ref = context.getResolver().resolveColumn(index.getTableName().getString(), indexColumn.getFamilyName() == null ? null : indexColumn.getFamilyName().getString(), indexColName); + ref = context.getResolver().resolveColumn(index.getTableName().getString(), indexColumnFamily, indexColName); } Expression expression = ref.newColumnExpression(); projectedExpressions.add(expression); http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index e2fc2ca..7d60cd5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -32,6 +32,11 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.client.HRegionLocator; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -50,6 +55,7 @@ import org.apache.phoenix.expression.Determinism; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.IndexMetaDataCacheClient; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.iterate.ResultIterator; @@ -81,11 +87,14 @@ import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ReadOnlyTableException; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.TypeMismatchException; import org.apache.phoenix.schema.tuple.Tuple; @@ -104,7 +113,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; public class UpsertCompiler { - private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map mutation, PhoenixStatement statement, boolean useServerTimestamp) { + private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map mutation, PhoenixStatement statement, boolean useServerTimestamp) throws SQLException { Map columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length); byte[][] pkValues = new byte[table.getPKColumns().size()][]; // If the table uses salting, the first byte is the salting byte, set to an empty array http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index fa4343a..39ac6fe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 3b8efc3..2d7c291 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -63,7 +63,6 @@ import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.SortOrder; -import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.Closeables; @@ -129,7 +128,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); List indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes); TupleProjector tupleProjector = null; - Region dataRegion = null; byte[][] viewConstants = null; ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); @@ -138,13 +136,12 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { if (ScanUtil.isLocalIndex(scan) || (j == null && p != null)) { if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); - dataRegion = IndexUtil.getDataRegion(c.getEnvironment()); viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); } ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); innerScanner = getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector, - dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr); + c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr); } if (j != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 8264101..e77ff8b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -1608,7 +1608,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } results.add(result); } - TableViewFinderResult tableViewFinderResult = new TableViewFinderResult(results); + TableViewFinderResult tableViewFinderResult = new TableViewFinderResult(results, table); if (numOfChildViews > 0 && !allViewsInCurrentRegion) { tableViewFinderResult.setAllViewsNotInSingleRegion(); } @@ -3377,13 +3377,21 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private List results = Lists.newArrayList(); private boolean allViewsNotInSingleRegion = false; + private PTable table; - private TableViewFinderResult(List results) { + private TableViewFinderResult(List results, PTable table) { this.results = results; + this.table = table; } public boolean hasViews() { - return results.size() > 0; + int localIndexesCount = 0; + for(PTable index : table.getIndexes()) { + if(index.getIndexType().equals(IndexType.LOCAL)) { + localIndexesCount++; + } + } + return results.size()-localIndexesCount > 0; } private void setAllViewsNotInSingleRegion() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index 35be54d..48e3704 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -209,7 +209,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); - dataRegion = IndexUtil.getDataRegion(c.getEnvironment()); + dataRegion = c.getEnvironment().getRegion(); byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); List indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes); indexMaintainer = indexMaintainers.get(0); http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index f817772..d474665 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -165,6 +165,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver throws IOException { s = super.preScannerOpen(e, scan, s); if (ScanUtil.isAnalyzeTable(scan)) { + if (!ScanUtil.isLocalIndex(scan)) { + scan.getFamilyMap().clear(); + } // We are setting the start row and stop row such that it covers the entire region. As part // of Phonenix-1263 we are storing the guideposts against the physical table rather than // individual tenant specific tables. @@ -180,6 +183,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver RegionCoprocessorEnvironment env = c.getEnvironment(); Region region = env.getRegion(); long ts = scan.getTimeRange().getMax(); + boolean localIndexScan = ScanUtil.isLocalIndex(scan); if (ScanUtil.isAnalyzeTable(scan)) { byte[] gp_width_bytes = scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_WIDTH_BYTES); @@ -192,7 +196,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver return collectStats(s, statsCollector, region, scan, env.getConfiguration()); } int offsetToBe = 0; - if (ScanUtil.isLocalIndex(scan)) { + if (localIndexScan) { /* * For local indexes, we need to set an offset on row key expressions to skip * the region start key. @@ -202,7 +206,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver ScanUtil.setRowKeyOffset(scan, offsetToBe); } final int offset = offsetToBe; - + PTable projectedTable = null; PTable writeToTable = null; byte[][] values = null; @@ -238,6 +242,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver projectedTable = deserializeTable(upsertSelectTable); selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS)); values = new byte[projectedTable.getPKColumns().size()][]; + } else { byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG); isDelete = isDeleteAgg != null && Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0; @@ -248,22 +253,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF); } TupleProjector tupleProjector = null; - Region dataRegion = null; byte[][] viewConstants = null; ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); - boolean localIndexScan = ScanUtil.isLocalIndex(scan); final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); if ((localIndexScan && !isDelete && !isDescRowKeyOrderUpgrade) || (j == null && p != null)) { if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); - dataRegion = IndexUtil.getDataRegion(env); viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); } ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); theScanner = getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector, - dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr); + region, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr); } if (j != null) { @@ -513,7 +515,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // Commit in batches based on UPSERT_BATCH_SIZE_ATTRIB in config if (!indexMutations.isEmpty() && batchSize > 0 && indexMutations.size() % batchSize == 0) { - commitIndexMutations(c, region, indexMutations); + commitBatch(region, indexMutations, null); + indexMutations.clear(); } } catch (ConstraintViolationException e) { // Log and ignore in count @@ -544,7 +547,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } if (!indexMutations.isEmpty()) { - commitIndexMutations(c, region, indexMutations); + commitBatch(region,indexMutations, null); + indexMutations.clear(); } final boolean hadAny = hasAny; @@ -579,31 +583,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver return scanner; } - private void commitIndexMutations(final ObserverContext c, - Region region, List indexMutations) throws IOException { - // Get indexRegion corresponding to data region - Region indexRegion = IndexUtil.getIndexRegion(c.getEnvironment()); - if (indexRegion != null) { - commitBatch(indexRegion, indexMutations, null); - } else { - TableName indexTable = - TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(region.getTableDesc() - .getName())); - HTableInterface table = null; - try { - table = c.getEnvironment().getTable(indexTable); - table.batch(indexMutations); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(), - ie); - } finally { - if (table != null) table.close(); - } - } - indexMutations.clear(); - } - @Override public InternalScanner preCompact(ObserverContext c, final Store store, InternalScanner scanner, final ScanType scanType) throws IOException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 43965f5..5a8fffa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.AmbiguousColumnException; import org.apache.phoenix.schema.AmbiguousTableException; @@ -203,6 +204,7 @@ public enum SQLExceptionCode { NULLABLE_FIXED_WIDTH_LAST_PK(1023, "42J04", "Cannot add column to table when the last PK column is nullable and fixed width."), CANNOT_MODIFY_VIEW_PK(1036, "42J04", "Cannot modify the primary key of a VIEW if last PK column of parent is variable length."), BASE_TABLE_COLUMN(1037, "42J04", "Cannot modify columns of base table used by tenant-specific tables."), + UNALLOWED_COLUMN_FAMILY(1090, "42J04", "Column family names should not contain local index column prefix: "+QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX), // Key/value column related errors KEY_VALUE_NOT_NULL(1007, "42K01", "A key/value column may not be declared as not null."), // View related errors. http://git-wip-us.apache.org/repos/asf/phoenix/blob/01de21dc/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java index 8604784..13a3047 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java @@ -17,20 +17,58 @@ */ package org.apache.phoenix.hbase.index; -import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy; +import java.util.List; + +import org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.query.QueryConstants; /** - * Split policy for index regions to avoid split from external requests. + * Split policy for local indexed tables to select split key from non local index column families + * always. */ -public class IndexRegionSplitPolicy extends RegionSplitPolicy { +public class IndexRegionSplitPolicy extends IncreasingToUpperBoundRegionSplitPolicy { @Override - protected boolean shouldSplit() { + protected boolean skipStoreFileRangeCheck(String familyName) { + if (familyName.startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { + return true; + } return false; } - protected boolean skipStoreFileRangeCheck() { - return true; - } + @Override + protected byte[] getSplitPoint() { + byte[] oldSplitPoint = super.getSplitPoint(); + if (oldSplitPoint == null) return null; + List stores = region.getStores(); + byte[] splitPointFromLargestStore = null; + long largestStoreSize = 0; + boolean isLocalIndexKey = false; + for (Store s : stores) { + if (s.getFamily().getNameAsString() + .startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { + byte[] splitPoint = s.getSplitPoint(); + if (oldSplitPoint != null && splitPoint != null + && Bytes.compareTo(oldSplitPoint, splitPoint) == 0) { + isLocalIndexKey = true; + } + } + } + if (!isLocalIndexKey) return oldSplitPoint; + for (Store s : stores) { + if (!s.getFamily().getNameAsString() + .startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { + byte[] splitPoint = s.getSplitPoint(); + long storeSize = s.getSize(); + if (splitPoint != null && largestStoreSize < storeSize) { + splitPointFromLargestStore = splitPoint; + largestStoreSize = storeSize; + } + } + } + return splitPointFromLargestStore; + } }