Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ADD5217A84 for ; Tue, 31 Mar 2015 01:40:23 +0000 (UTC) Received: (qmail 20815 invoked by uid 500); 31 Mar 2015 01:40:21 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 20712 invoked by uid 500); 31 Mar 2015 01:40:21 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 19901 invoked by uid 99); 31 Mar 2015 01:40:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Mar 2015 01:40:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 876FEE2F0A; Tue, 31 Mar 2015 01:40:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: apurtell@apache.org To: commits@hbase.apache.org Date: Tue, 31 Mar 2015 01:40:34 -0000 Message-Id: <5998237a5efd41489b0940d28f75413d@git.apache.org> In-Reply-To: <8aef9f7247ab4f26bb76663eae8e2f38@git.apache.org> References: <8aef9f7247ab4f26bb76663eae8e2f38@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/16] hbase git commit: HBASE-12972 Region, a supportable public/evolving subset of HRegion http://git-wip-us.apache.org/repos/asf/hbase/blob/af171593/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 1943c3a..c31a39b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -145,6 +145,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; @@ -192,44 +193,8 @@ import com.google.protobuf.RpcController; import com.google.protobuf.Service; import com.google.protobuf.TextFormat; -/** - * HRegion stores data for a certain region of a table. It stores all columns - * for each row. A given table consists of one or more HRegions. - * - *

We maintain multiple HStores for a single HRegion. - * - *

An Store is a set of rows with some column data; together, - * they make up all the data for the rows. - * - *

Each HRegion has a 'startKey' and 'endKey'. - *

The first is inclusive, the second is exclusive (except for - * the final region) The endKey of region 0 is the same as - * startKey for region 1 (if it exists). The startKey for the - * first region is null. The endKey for the final region is null. - * - *

Locking at the HRegion level serves only one purpose: preventing the - * region from being closed (and consequently split) while other operations - * are ongoing. Each row level operation obtains both a row lock and a region - * read lock for the duration of the operation. While a scanner is being - * constructed, getScanner holds a read lock. If the scanner is successfully - * constructed, it holds a read lock until it is closed. A close takes out a - * write lock and consequently will block for ongoing operations and will block - * new operations from starting while the close is in progress. - * - *

An HRegion is defined by its table and its key extent. - * - *

It consists of at least one Store. The number of Stores should be - * configurable, so that data which is accessed together is stored in the same - * Store. Right now, we approximate that by building a single Store for - * each column family. (This config info will be communicated via the - * tabledesc.) - * - *

The HTableDescriptor contains metainfo about the HRegion's table. - * regionName is a unique identifier for this HRegion. (startKey, endKey] - * defines the keyspace for this HRegion. - */ @InterfaceAudience.Private -public class HRegion implements HeapSize, PropagatingConfigurationObserver { // , Writable{ +public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region { public static final Log LOG = LogFactory.getLog(HRegion.class); public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = @@ -281,17 +246,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // protected volatile long lastReplayedOpenRegionSeqId = -1L; protected volatile long lastReplayedCompactionSeqId = -1L; - /** - * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for - * startRegionOperation to possibly invoke different checks before any region operations. Not all - * operations have to be defined here. It's only needed when a special check is need in - * startRegionOperation - */ - public enum Operation { - ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE, - REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT - } - ////////////////////////////////////////////////////////////////////////////// // Members ////////////////////////////////////////////////////////////////////////////// @@ -327,19 +281,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // Number of requests blocked by memstore size. private final Counter blockedRequestsCount = new Counter(); - /** - * @return the number of blocked requests count. - */ - public long getBlockedRequestsCount() { - return this.blockedRequestsCount.get(); - } - // Compaction counters final AtomicLong compactionsFinished = new AtomicLong(0L); final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L); final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L); - private final WAL wal; private final HRegionFileSystem fs; protected final Configuration conf; @@ -428,6 +374,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } return minimumReadPoint; } + /* * Data structure of write state flags used coordinating flushes, * compactions and closes. @@ -479,17 +426,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * be specified if the flush was successful, and the failure message should only be specified * if it didn't flush. */ - public static class FlushResult { - enum Result { - FLUSHED_NO_COMPACTION_NEEDED, - FLUSHED_COMPACTION_NEEDED, - // Special case where a flush didn't run because there's nothing in the memstores. Used when - // bulk loading to know when we can still load even if a flush didn't happen. - CANNOT_FLUSH_MEMSTORE_EMPTY, - CANNOT_FLUSH - // Be careful adding more to this enum, look at the below methods to make sure - } - + public static class FlushResultImpl implements FlushResult { final Result result; final String failureReason; final long flushSequenceId; @@ -502,7 +439,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @param flushSequenceId Generated sequence id that comes right after the edits in the * memstores. */ - FlushResult(Result result, long flushSequenceId) { + FlushResultImpl(Result result, long flushSequenceId) { this(result, flushSequenceId, null, false); assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result .FLUSHED_COMPACTION_NEEDED; @@ -513,7 +450,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH. * @param failureReason Reason why we couldn't flush. */ - FlushResult(Result result, String failureReason, boolean wroteFlushMarker) { + FlushResultImpl(Result result, String failureReason, boolean wroteFlushMarker) { this(result, -1, failureReason, wroteFlushMarker); assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH; } @@ -524,7 +461,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @param flushSequenceId Generated sequence id if the memstores were flushed else -1. * @param failureReason Reason why we couldn't flush, or null. */ - FlushResult(Result result, long flushSequenceId, String failureReason, + FlushResultImpl(Result result, long flushSequenceId, String failureReason, boolean wroteFlushMarker) { this.result = result; this.flushSequenceId = flushSequenceId; @@ -557,6 +494,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // .append("failureReason:").append(failureReason).append(",") .append("flush seq id").append(flushSequenceId).toString(); } + + @Override + public Result getResult() { + return result; + } } /** A result object from prepare flush cache stage */ @@ -597,6 +539,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // this.flushedSeqId = flushedSeqId; this.totalFlushableSize = totalFlushableSize; } + + public FlushResult getResult() { + return this.result; + } } final WriteState writestate = new WriteState(); @@ -760,7 +706,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this); this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper); - Map recoveringRegions = rsServices.getRecoveringRegions(); + Map recoveringRegions = rsServices.getRecoveringRegions(); String encodedName = getRegionInfo().getEncodedName(); if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) { this.isRecovering = true; @@ -832,8 +778,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // nextSeqid will be -1 if the initialization fails. // At least it will be 0 otherwise. if (nextSeqId == -1) { - status - .abort("Exception during region " + this.getRegionNameAsString() + " initialization."); + status.abort("Exception during region " + getRegionInfo().getRegionNameAsString() + + " initialization."); } } } @@ -946,7 +892,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) { Future future = completionService.take(); HStore store = future.get(); - this.stores.put(store.getColumnFamilyName().getBytes(), store); + this.stores.put(store.getFamily().getName(), store); long storeMaxSequenceId = store.getMaxSequenceId(); maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), @@ -998,15 +944,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException { - Map> storeFiles - = new TreeMap>(Bytes.BYTES_COMPARATOR); - for (Map.Entry entry : getStores().entrySet()) { - Store store = entry.getValue(); + Map> storeFiles = new TreeMap>(Bytes.BYTES_COMPARATOR); + for (Store store: getStores()) { ArrayList storeFileNames = new ArrayList(); for (StoreFile storeFile: store.getStorefiles()) { storeFileNames.add(storeFile.getPath()); } - storeFiles.put(entry.getKey(), storeFileNames); + storeFiles.put(store.getFamily().getName(), storeFileNames); } RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor( @@ -1017,15 +961,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } private void writeRegionCloseMarker(WAL wal) throws IOException { - Map> storeFiles - = new TreeMap>(Bytes.BYTES_COMPARATOR); - for (Map.Entry entry : getStores().entrySet()) { - Store store = entry.getValue(); + Map> storeFiles = new TreeMap>(Bytes.BYTES_COMPARATOR); + for (Store store: getStores()) { ArrayList storeFileNames = new ArrayList(); for (StoreFile storeFile: store.getStorefiles()) { storeFileNames.add(storeFile.getPath()); } - storeFiles.put(entry.getKey(), storeFileNames); + storeFiles.put(store.getFamily().getName(), storeFileNames); } RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor( @@ -1053,11 +995,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return false; } - /** - * This function will return the HDFS blocks distribution based on the data - * captured when HFile is created - * @return The HDFS blocks distribution for the region. - */ + @Override public HDFSBlocksDistribution getHDFSBlocksDistribution() { HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); @@ -1114,10 +1052,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return hdfsBlocksDistribution; } - public AtomicLong getMemstoreSize() { - return memstoreSize; - } - /** * Increase the size of mem store in this region and the size of global mem * store @@ -1130,7 +1064,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return this.memstoreSize.addAndGet(memStoreSize); } - /** @return a HRegionInfo object for this region */ + @Override public HRegionInfo getRegionInfo() { return this.fs.getRegionInfo(); } @@ -1143,32 +1077,76 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return this.rsServices; } - /** @return readRequestsCount for this region */ - long getReadRequestsCount() { - return this.readRequestsCount.get(); + @Override + public long getReadRequestsCount() { + return readRequestsCount.get(); + } + + @Override + public void updateReadRequestsCount(long i) { + readRequestsCount.add(i); + } + + @Override + public long getWriteRequestsCount() { + return writeRequestsCount.get(); + } + + @Override + public void updateWriteRequestsCount(long i) { + writeRequestsCount.add(i); + } + + @Override + public long getMemstoreSize() { + return memstoreSize.get(); + } + + @Override + public long getNumMutationsWithoutWAL() { + return numMutationsWithoutWAL.get(); + } + + @Override + public long getDataInMemoryWithoutWAL() { + return dataInMemoryWithoutWAL.get(); + } + + @Override + public long getBlockedRequestsCount() { + return blockedRequestsCount.get(); + } + + @Override + public long getCheckAndMutateChecksPassed() { + return checkAndMutateChecksPassed.get(); } - /** @return writeRequestsCount for this region */ - long getWriteRequestsCount() { - return this.writeRequestsCount.get(); + @Override + public long getCheckAndMutateChecksFailed() { + return checkAndMutateChecksFailed.get(); } + @Override public MetricsRegion getMetrics() { return metricsRegion; } - /** @return true if region is closed */ + @Override public boolean isClosed() { return this.closed.get(); } - /** - * @return True if closing process has started. - */ + @Override public boolean isClosing() { return this.closing.get(); } + @Override + public boolean isReadOnly() { + return this.writestate.isReadOnly(); + } + /** * Reset recovering state of current region */ @@ -1221,14 +1199,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - /** - * @return True if current region is in recovering - */ + @Override public boolean isRecovering() { return this.isRecovering; } - /** @return true if region is available (not closed and not closing) */ + @Override public boolean isAvailable() { return !isClosed() && !isClosing(); } @@ -1243,12 +1219,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // */ public boolean isMergeable() { if (!isAvailable()) { - LOG.debug("Region " + this.getRegionNameAsString() + LOG.debug("Region " + getRegionInfo().getRegionNameAsString() + " is not mergeable because it is closing or closed"); return false; } if (hasReferences()) { - LOG.debug("Region " + this.getRegionNameAsString() + LOG.debug("Region " + getRegionInfo().getRegionNameAsString() + " is not mergeable because it has references"); return false; } @@ -1266,9 +1242,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return mvcc; } - /* - * Returns readpoint considering given IsolationLevel - */ + @Override + public long getMaxFlushedSeqId() { + return maxFlushedSeqId; + } + + @Override public long getReadpoint(IsolationLevel isolationLevel) { if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) { // This scan can read even uncommitted transactions @@ -1277,6 +1256,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return mvcc.memstoreReadPoint(); } + @Override public boolean isLoadingCfsOnDemandDefault() { return this.isLoadingCfsOnDemandDefault; } @@ -1375,7 +1355,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // the close flag? if (!abort && worthPreFlushing() && canFlush) { status.setStatus("Pre-flushing region before close"); - LOG.info("Running close preflush of " + this.getRegionNameAsString()); + LOG.info("Running close preflush of " + getRegionInfo().getRegionNameAsString()); try { internalFlushcache(status); } catch (IOException ioe) { @@ -1398,7 +1378,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // Don't flush the cache if we are aborting if (!abort && canFlush) { int flushCount = 0; - while (this.getMemstoreSize().get() > 0) { + while (this.memstoreSize.get() > 0) { try { if (flushCount++ > 0) { int actualFlushes = flushCount - 1; @@ -1406,7 +1386,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // If we tried 5 times and are unable to clear memory, abort // so we do not lose data throw new DroppedSnapshotException("Failed clearing memory after " + - actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName())); + actualFlushes + " attempts on region: " + + Bytes.toStringBinary(getRegionInfo().getRegionName())); } LOG.info("Running extra flush, " + actualFlushes + " (carrying snapshot?) " + this); @@ -1428,7 +1409,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (!stores.isEmpty()) { // initialize the thread pool for closing stores in parallel. ThreadPoolExecutor storeCloserThreadPool = - getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString()); + getStoreOpenAndCloseThreadPool("StoreCloserThread-" + + getRegionInfo().getRegionNameAsString()); CompletionService>> completionService = new ExecutorCompletionService>>(storeCloserThreadPool); @@ -1493,11 +1475,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - /** - * Wait for all current flushes and compactions of the region to complete. - *

- * Exposed for TESTING. - */ + @Override public void waitForFlushesAndCompactions() { synchronized (writestate) { if (this.writestate.readOnly) { @@ -1570,32 +1548,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // HRegion accessors ////////////////////////////////////////////////////////////////////////////// - /** @return start key for region */ - public byte [] getStartKey() { - return this.getRegionInfo().getStartKey(); - } - - /** @return end key for region */ - public byte [] getEndKey() { - return this.getRegionInfo().getEndKey(); - } - - /** @return region id */ - public long getRegionId() { - return this.getRegionInfo().getRegionId(); - } - - /** @return region name */ - public byte [] getRegionName() { - return this.getRegionInfo().getRegionName(); - } - - /** @return region name as string for logging */ - public String getRegionNameAsString() { - return this.getRegionInfo().getRegionNameAsString(); - } - - /** @return HTableDescriptor for this region */ + @Override public HTableDescriptor getTableDesc() { return this.htableDescriptor; } @@ -1626,25 +1579,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return this.fs; } - /** - * @return Returns the earliest time a store in the region was flushed. All - * other stores in the region would have been flushed either at, or - * after this time. - */ - @VisibleForTesting + @Override public long getEarliestFlushTimeForAllStores() { return lastStoreFlushTimeMap.isEmpty() ? Long.MAX_VALUE : Collections.min(lastStoreFlushTimeMap .values()); } - /** - * This can be used to determine the last time all files of this region were major compacted. - * @param majorCompactioOnly Only consider HFile that are the result of major compaction - * @return the timestamp of the oldest HFile for all stores of this region - */ + @Override public long getOldestHfileTs(boolean majorCompactioOnly) throws IOException { long result = Long.MAX_VALUE; - for (Store store : getStores().values()) { + for (Store store : getStores()) { for (StoreFile file : store.getStorefiles()) { HFile.Reader reader = file.getReader().getHFileReader(); if (majorCompactioOnly) { @@ -1709,25 +1653,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // protected void doRegionCompactionPrep() throws IOException { } - void triggerMajorCompaction() { - for (Store h : stores.values()) { - h.triggerMajorCompaction(); + @Override + public void triggerMajorCompaction() throws IOException { + for (Store s : getStores()) { + s.triggerMajorCompaction(); } } - /** - * This is a helper function that compact all the stores synchronously - * It is used by utilities and testing - * - * @param majorCompaction True to force a major compaction regardless of thresholds - * @throws IOException e - */ - public void compactStores(final boolean majorCompaction) - throws IOException { + @Override + public void compact(final boolean majorCompaction) throws IOException { if (majorCompaction) { - this.triggerMajorCompaction(); + triggerMajorCompaction(); + } + for (Store s : getStores()) { + CompactionContext compaction = s.requestCompaction(); + if (compaction != null) { + CompactionThroughputController controller = null; + if (rsServices != null) { + controller = CompactionThroughputControllerFactory.create(rsServices, conf); + } + if (controller == null) { + controller = NoLimitCompactionThroughputController.INSTANCE; + } + compact(compaction, s, controller); + } } - compactStores(); } /** @@ -1737,7 +1687,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @throws IOException e */ public void compactStores() throws IOException { - for (Store s : getStores().values()) { + for (Store s : getStores()) { CompactionContext compaction = s.requestCompaction(); if (compaction != null) { compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE); @@ -1854,43 +1804,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - /** - * Flush all stores. - *

- * See {@link #flushcache(boolean)}. - * - * @return whether the flush is success and whether the region needs compacting - * @throws IOException - */ - public FlushResult flushcache() throws IOException { - return flushcache(true, false); - } - - /** - * Flush the cache. - * - * When this method is called the cache will be flushed unless: - *

    - *
  1. the cache is empty
  2. - *
  3. the region is closed.
  4. - *
  5. a flush is already in progress
  6. - *
  7. writes are disabled
  8. - *
- * - *

This method may block for some time, so it should not be called from a - * time-sensitive thread. - * @param forceFlushAllStores whether we want to flush all stores - * @return whether the flush is success and whether the region needs compacting - * - * @throws IOException general io exceptions - * @throws DroppedSnapshotException Thrown when replay of wal is required - * because a Snapshot was not properly persisted. - */ - public FlushResult flushcache(boolean forceFlushAllStores) throws IOException { - return flushcache(forceFlushAllStores, false); + @Override + public FlushResult flush(boolean force) throws IOException { + return flushcache(force, false); } - /** * Flush the cache. * @@ -1918,7 +1836,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (this.closing.get()) { String msg = "Skipping flush on " + this + " because closing"; LOG.debug(msg); - return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg, false); + return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); } MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); status.setStatus("Acquiring readlock on region"); @@ -1929,7 +1847,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // String msg = "Skipping flush on " + this + " because closed"; LOG.debug(msg); status.abort(msg); - return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg, false); + return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); } if (coprocessorHost != null) { status.setStatus("Running coprocessor pre-flush hooks"); @@ -1954,7 +1872,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // + (writestate.flushing ? "already flushing" : "writes not enabled"); status.abort(msg); - return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg, false); + return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); } } @@ -2043,7 +1961,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } //since we didn't flush in the recent past, flush now if certain conditions //are met. Return true on first such memstore hit. - for (Store s : this.getStores().values()) { + for (Store s : getStores()) { if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) { // we have an old enough edit in the memstore, flush return true; @@ -2140,7 +2058,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (wal != null) { w = mvcc.beginMemstoreInsert(); long flushOpSeqId = getNextSequenceId(wal); - FlushResult flushResult = new FlushResult( + FlushResult flushResult = new FlushResultImpl( FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId, "Nothing to flush", writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker)); w.setWriteNumber(flushOpSeqId); @@ -2149,8 +2067,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return new PrepareFlushResult(flushResult, myseqid); } else { return new PrepareFlushResult( - new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush", - false), + new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, + "Nothing to flush", false), myseqid); } } @@ -2218,7 +2136,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; status.setStatus(msg); return new PrepareFlushResult( - new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg, false), + new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false), myseqid); } flushOpSeqId = getNextSequenceId(wal); @@ -2405,7 +2323,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); } DroppedSnapshotException dse = new DroppedSnapshotException("region: " + - Bytes.toStringBinary(getRegionName())); + Bytes.toStringBinary(getRegionInfo().getRegionName())); dse.initCause(t); status.abort("Flush failed: " + StringUtils.stringifyException(t)); throw dse; @@ -2445,8 +2363,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // LOG.info(msg); status.setStatus(msg); - return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED : - FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId); + return new FlushResultImpl(compactionRequested ? + FlushResult.Result.FLUSHED_COMPACTION_NEEDED : + FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, + flushOpSeqId); } /** @@ -2463,32 +2383,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // ////////////////////////////////////////////////////////////////////////////// // get() methods for client use. ////////////////////////////////////////////////////////////////////////////// - /** - * Return all the data for the row that matches row exactly, - * or the one that immediately preceeds it, at or immediately before - * ts. - * - * @param row row key - * @return map of values - * @throws IOException - */ - Result getClosestRowBefore(final byte [] row) - throws IOException{ - return getClosestRowBefore(row, HConstants.CATALOG_FAMILY); - } - /** - * Return all the data for the row that matches row exactly, - * or the one that immediately precedes it, at or immediately before - * ts. - * - * @param row row key - * @param family column family to find on - * @return map of values - * @throws IOException read exceptions - */ - public Result getClosestRowBefore(final byte [] row, final byte [] family) - throws IOException { + @Override + public Result getClosestRowBefore(final byte [] row, final byte [] family) throws IOException { if (coprocessorHost != null) { Result result = new Result(); if (coprocessorHost.preGetClosestRowBefore(row, family, result)) { @@ -2519,37 +2416,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - /** - * Return an iterator that scans over the HRegion, returning the indicated - * columns and rows specified by the {@link Scan}. - *

- * This Iterator must be closed by the caller. - * - * @param scan configured {@link Scan} - * @return RegionScanner - * @throws IOException read exceptions - */ + @Override public RegionScanner getScanner(Scan scan) throws IOException { return getScanner(scan, null); } - void prepareScanner(Scan scan) { - if(!scan.hasFamilies()) { - // Adding all families to scanner - for(byte[] family: this.htableDescriptor.getFamiliesKeys()){ - scan.addFamily(family); - } - } - } - protected RegionScanner getScanner(Scan scan, List additionalScanners) throws IOException { startRegionOperation(Operation.SCAN); try { // Verify families are all valid - prepareScanner(scan); - if(scan.hasFamilies()) { - for(byte [] family : scan.getFamilyMap().keySet()) { + if (!scan.hasFamilies()) { + // Adding all families to scanner + for (byte[] family: this.htableDescriptor.getFamiliesKeys()) { + scan.addFamily(family); + } + } else { + for (byte [] family : scan.getFamilyMap().keySet()) { checkFamily(family); } } @@ -2570,10 +2453,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return new RegionScannerImpl(scan, additionalScanners, this); } - /* - * @param delete The passed delete is modified by this method. WARNING! - */ - void prepareDelete(Delete delete) throws IOException { + @Override + public void prepareDelete(Delete delete) throws IOException { // Check to see if this is a deleteRow insert if(delete.getFamilyCellMap().isEmpty()){ for(byte [] family : this.htableDescriptor.getFamiliesKeys()){ @@ -2590,15 +2471,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - ////////////////////////////////////////////////////////////////////////////// - // set() methods for client use. - ////////////////////////////////////////////////////////////////////////////// - /** - * @param delete delete object - * @throws IOException read exceptions - */ - public void delete(Delete delete) - throws IOException { + @Override + public void delete(Delete delete) throws IOException { checkReadOnly(); checkResources(); startRegionOperation(Operation.DELETE); @@ -2615,6 +2489,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * Row needed by below method. */ private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly"); + /** * This is used only by unit tests. Not required to be a public API. * @param familyMap map of family to edits for the given family. @@ -2628,15 +2503,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // doBatchMutate(delete); } - /** - * Setup correct timestamps in the KVs in Delete object. - * Caller should have the row and region locks. - * @param mutation - * @param familyMap - * @param byteNow - * @throws IOException - */ - void prepareDeleteTimestamps(Mutation mutation, Map> familyMap, + @Override + public void prepareDeleteTimestamps(Mutation mutation, Map> familyMap, byte[] byteNow) throws IOException { for (Map.Entry> e : familyMap.entrySet()) { @@ -2696,11 +2564,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // CellUtil.setTimestamp(cell, getCell.getTimestamp()); } - /** - * @throws IOException - */ - public void put(Put put) - throws IOException { + @Override + public void put(Put put) throws IOException { checkReadOnly(); // Do a rough check that we have resources to accept a write. The check is @@ -2827,16 +2692,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - /** - * Perform a batch of mutations. - * It supports only Put and Delete mutations and will ignore other types passed. - * @param mutations the list of mutations - * @return an array of OperationStatus which internally contains the - * OperationStatusCode and the exceptionMessage if any. - * @throws IOException - */ - public OperationStatus[] batchMutate( - Mutation[] mutations, long nonceGroup, long nonce) throws IOException { + @Override + public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce) + throws IOException { // As it stands, this is used for 3 things // * batchMutate with single mutation - put/delete, separate or from checkAndMutate. // * coprocessor calls (see ex. BulkDeleteEndpoint). @@ -2848,14 +2706,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); } - /** - * Replay a batch of mutations. - * @param mutations mutations to replay. - * @param replaySeqId SeqId for current mutations - * @return an array of OperationStatus which internally contains the - * OperationStatusCode and the exceptionMessage if any. - * @throws IOException - */ + @Override public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) throws IOException { if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo()) @@ -3359,11 +3210,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // //the getting of the lock happens before, so that you would just pass it into //the methods. So in the case of checkAndMutate you could just do lockRow, //get, put, unlockRow or something - /** - * - * @throws IOException - * @return true if the new put was executed, false otherwise - */ + + @Override public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOp compareOp, ByteArrayComparable comparator, Mutation w, boolean writeToWAL) @@ -3464,15 +3312,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // //the getting of the lock happens before, so that you would just pass it into //the methods. So in the case of checkAndMutate you could just do lockRow, //get, put, unlockRow or something - /** - * - * @throws IOException - * @return true if the new put was executed, false otherwise - */ + + @Override public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm, - boolean writeToWAL) - throws IOException{ + boolean writeToWAL) throws IOException { checkReadOnly(); //TODO, add check for value length or maybe even better move this to the //client if this becomes a global setting @@ -3543,10 +3387,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // closeRegionOperation(); } } + private void doBatchMutate(Mutation mutation) throws IOException { // Currently this is only called for puts and deletes, so no nonces. - OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation }, - HConstants.NO_NONCE, HConstants.NO_NONCE); + OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation }); if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { @@ -3577,12 +3421,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // manifest.addRegion(this); } - /** - * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} with the - * provided current timestamp. - * @throws IOException - */ - void updateCellTimestamps(final Iterable> cellItr, final byte[] now) + @Override + public void updateCellTimestamps(final Iterable> cellItr, final byte[] now) throws IOException { for (List cells: cellItr) { if (cells == null) continue; @@ -3674,7 +3514,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @throws IOException Throws exception if region is in read-only mode. */ protected void checkReadOnly() throws IOException { - if (this.writestate.isReadOnly()) { + if (isReadOnly()) { throw new IOException("region is read only"); } } @@ -3768,12 +3608,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // LOG.debug("rollbackMemstore rolled back " + kvsRolledback); } - /** - * Check the collection of families for validity. - * @throws NoSuchColumnFamilyException if a family does not exist. - */ - void checkFamilies(Collection families) - throws NoSuchColumnFamilyException { + @Override + public void checkFamilies(Collection families) throws NoSuchColumnFamilyException { for (byte[] family : families) { checkFamily(family); } @@ -3803,8 +3639,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - void checkTimestamps(final Map> familyMap, - long now) throws FailedSanityCheckException { + @Override + public void checkTimestamps(final Map> familyMap, long now) + throws FailedSanityCheckException { if (timestampSlop == HConstants.LATEST_TIMESTAMP) { return; } @@ -3970,7 +3807,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // The edits size added into rsAccounting during this replaying will not // be required any more. So just clear it. if (this.rsAccounting != null) { - this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName()); + this.rsAccounting.clearRegionReplayEditsSize(getRegionInfo().getRegionName()); } if (seqid > minSeqIdForTheRegion) { // Then we added some edits to memory. Flush and cleanup split edit files. @@ -4341,7 +4178,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } else { // special case empty memstore. We will still save the flush result in this case, since // our memstore ie empty, but the primary is still flushing - if (prepareResult.result.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { + if (prepareResult.getResult().getResult() == + FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { this.writestate.flushing = true; this.prepareFlushResult = prepareResult; if (LOG.isDebugEnabled()) { @@ -4853,15 +4691,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - /** - * Checks the underlying store files, and opens the files that have not - * been opened, and removes the store file readers for store files no longer - * available. Mainly used by secondary region replicas to keep up to date with - * the primary region files or open new flushed files and drop their memstore snapshots in case - * of memory pressure. - * @throws IOException - */ - boolean refreshStoreFiles() throws IOException { + @Override + public boolean refreshStoreFiles() throws IOException { if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { return false; // if primary nothing to do } @@ -4878,7 +4709,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // startRegionOperation(); // obtain region close lock try { synchronized (writestate) { - for (Store store : getStores().values()) { + for (Store store : getStores()) { // TODO: some stores might see new data from flush, while others do not which // MIGHT break atomic edits across column families. long maxSeqIdBefore = store.getMaxSequenceId(); @@ -4921,7 +4752,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // advance the mvcc read point so that the new flushed files are visible. // there may be some in-flight transactions, but they won't be made visible since they are // either greater than flush seq number or they were already picked up via flush. - for (Store s : getStores().values()) { + for (Store s : getStores()) { getMVCC().advanceMemstoreReadPointIfNeeded(s.getMaxMemstoreTS()); } @@ -4984,7 +4815,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // protected boolean restoreEdit(final Store s, final Cell cell) { long kvSize = s.add(cell).getFirst(); if (this.rsAccounting != null) { - rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize); + rsAccounting.addAndGetRegionReplayEditsSize(getRegionInfo().getRegionName(), kvSize); } return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize)); } @@ -5008,13 +4839,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return new HStore(this, family, this.conf); } - /** - * Return HStore instance. - * Use with caution. Exposed for use of fixup utilities. - * @param column Name of column family hosted by this region. - * @return Store that goes with the family on passed column. - * TODO: Make this lookup faster. - */ + @Override public Store getStore(final byte[] column) { return this.stores.get(column); } @@ -5035,17 +4860,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return null; } - public Map getStores() { - return this.stores; + @Override + public List getStores() { + List list = new ArrayList(stores.size()); + list.addAll(stores.values()); + return list; } - /** - * Return list of storeFiles for the set of CFs. - * Uses closeLock to prevent the race condition where a region closes - * in between the for loop - closing the stores one by one, some stores - * will return 0 files. - * @return List of storeFiles. - */ + @Override public List getStoreFileList(final byte [][] columns) throws IllegalArgumentException { List storeFileNames = new ArrayList(); @@ -5075,21 +4897,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (!rowIsInRange(getRegionInfo(), row)) { throw new WrongRegionException("Requested row out of range for " + op + " on HRegion " + this + ", startKey='" + - Bytes.toStringBinary(getStartKey()) + "', getEndKey()='" + - Bytes.toStringBinary(getEndKey()) + "', row='" + + Bytes.toStringBinary(getRegionInfo().getStartKey()) + "', getEndKey()='" + + Bytes.toStringBinary(getRegionInfo().getEndKey()) + "', row='" + Bytes.toStringBinary(row) + "'"); } } - /** - * Tries to acquire a lock on the given row. - * @param waitForLock if true, will block until the lock is available. - * Otherwise, just tries to obtain the lock and returns - * false if unavailable. - * @return the row lock if acquired, - * null if waitForLock was false and the lock was not acquired - * @throws IOException if waitForLock was true and the lock could not be acquired after waiting - */ + @Override public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException { startRegionOperation(); try { @@ -5161,9 +4975,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return getRowLock(row, true); } - /** - * If the given list of row locks is not null, releases all locks. - */ + @Override public void releaseRowLocks(List rowLocks) { if (rowLocks != null) { for (RowLock rowLock : rowLocks) { @@ -5179,8 +4991,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * * @param familyPaths List of Pair */ - private static boolean hasMultipleColumnFamilies( - List> familyPaths) { + private static boolean hasMultipleColumnFamilies(Collection> familyPaths) { boolean multipleFamilies = false; byte[] family = null; for (Pair pair : familyPaths) { @@ -5195,36 +5006,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return multipleFamilies; } - /** - * Bulk load a/many HFiles into this region - * - * @param familyPaths A list which maps column families to the location of the HFile to load - * into that column family region. - * @param assignSeqId Force a flush, get it's sequenceId to preserve the guarantee that all the - * edits lower than the highest sequential ID from all the HFiles are flushed - * on disk. - * @return true if successful, false if failed recoverably - * @throws IOException if failed unrecoverably. - */ - public boolean bulkLoadHFiles(List> familyPaths, - boolean assignSeqId) throws IOException { - return bulkLoadHFiles(familyPaths, assignSeqId, null); - } - - /** - * Attempts to atomically load a group of hfiles. This is critical for loading - * rows with multiple column families atomically. - * - * @param familyPaths List of Pair - * @param bulkLoadListener Internal hooks enabling massaging/preparation of a - * file about to be bulk loaded - * @param assignSeqId Force a flush, get it's sequenceId to preserve the guarantee that - * all the edits lower than the highest sequential ID from all the - * HFiles are flushed on disk. - * @return true if successful, false if failed recoverably - * @throws IOException if failed unrecoverably. - */ - public boolean bulkLoadHFiles(List> familyPaths, boolean assignSeqId, + @Override + public boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException { long seqId = -1; Map> storeFiles = new TreeMap>(Bytes.BYTES_COMPARATOR); @@ -5287,14 +5070,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is // a sequence id that we can be sure is beyond the last hfile written). if (assignSeqId) { - FlushResult fs = this.flushcache(); + FlushResult fs = flushcache(true, false); if (fs.isFlushSucceeded()) { - seqId = fs.flushSequenceId; - } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { - seqId = fs.flushSequenceId; + seqId = ((FlushResultImpl)fs).flushSequenceId; + } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { + seqId = ((FlushResultImpl)fs).flushSequenceId; } else { - throw new IOException("Could not bulk load with an assigned sequential ID because the " + - "flush didn't run. Reason for not flushing: " + fs.failureReason); + throw new IOException("Could not bulk load with an assigned sequential ID because the "+ + "flush didn't run. Reason for not flushing: " + ((FlushResultImpl)fs).failureReason); } } @@ -5363,18 +5146,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // @Override public boolean equals(Object o) { - return o instanceof HRegion && Bytes.equals(this.getRegionName(), - ((HRegion) o).getRegionName()); + return o instanceof HRegion && Bytes.equals(getRegionInfo().getRegionName(), + ((HRegion) o).getRegionInfo().getRegionName()); } @Override public int hashCode() { - return Bytes.hashCode(this.getRegionName()); + return Bytes.hashCode(getRegionInfo().getRegionName()); } @Override public String toString() { - return this.getRegionNameAsString(); + return getRegionInfo().getRegionNameAsString(); } /** @@ -5704,7 +5487,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // long afterTime = rpcCall.disconnectSince(); if (afterTime >= 0) { throw new CallerDisconnectedException( - "Aborting on region " + getRegionNameAsString() + ", call " + + "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " + this + " after " + afterTime + " ms, since " + "caller disconnected"); } @@ -6314,6 +6097,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return r.openHRegion(reporter); } + public static Region openHRegion(final Region other, final CancelableProgressable reporter) + throws IOException { + return openHRegion((HRegion)other, reporter); + } + /** * Open HRegion. * Calls initialize and sets sequenceId. @@ -6439,7 +6227,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException { meta.checkResources(); // The row key is the region name - byte[] row = r.getRegionName(); + byte[] row = r.getRegionInfo().getRegionName(); final long now = EnvironmentEdgeManager.currentTime(); final List cells = new ArrayList(2); cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY, @@ -6506,18 +6294,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // Make sure that srcA comes first; important for key-ordering during // write of the merged file. - if (srcA.getStartKey() == null) { - if (srcB.getStartKey() == null) { + if (srcA.getRegionInfo().getStartKey() == null) { + if (srcB.getRegionInfo().getStartKey() == null) { throw new IOException("Cannot merge two regions with null start key"); } // A's start key is null but B's isn't. Assume A comes before B - } else if ((srcB.getStartKey() == null) || - (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) { + } else if ((srcB.getRegionInfo().getStartKey() == null) || + (Bytes.compareTo(srcA.getRegionInfo().getStartKey(), + srcB.getRegionInfo().getStartKey()) > 0)) { a = srcB; b = srcA; } - if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) { + if (!(Bytes.compareTo(a.getRegionInfo().getEndKey(), + b.getRegionInfo().getStartKey()) == 0)) { throw new IOException("Cannot merge non-adjacent regions"); } return merge(a, b); @@ -6538,16 +6328,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // FileSystem fs = a.getRegionFileSystem().getFileSystem(); // Make sure each region's cache is empty - a.flushcache(); - b.flushcache(); + a.flush(true); + b.flush(true); // Compact each region so we only have one store file per family - a.compactStores(true); + a.compact(true); if (LOG.isDebugEnabled()) { LOG.debug("Files for region: " + a); a.getRegionFileSystem().logFileSystemState(LOG); } - b.compactStores(true); + b.compact(true); if (LOG.isDebugEnabled()) { LOG.debug("Files for region: " + b); b.getRegionFileSystem().logFileSystemState(LOG); @@ -6572,7 +6362,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // throw new IOException("Failed merging region " + a + " and " + b + ", and successfully rolled back"); } - dstRegion.compactStores(true); + dstRegion.compact(true); if (LOG.isDebugEnabled()) { LOG.debug("Files for new region"); @@ -6593,14 +6383,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return dstRegion; } - // - // HBASE-880 - // - /** - * @param get get object - * @return result - * @throws IOException read exceptions - */ + @Override public Result get(final Get get) throws IOException { checkRow(get.getRow(), "Get"); // Verify families are all valid @@ -6618,13 +6401,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale); } - /* - * Do a get based on the get parameter. - * @param withCoprocessor invoke coprocessor or not. We don't want to - * always invoke cp for this private method. - */ - public List get(Get get, boolean withCoprocessor) - throws IOException { + @Override + public List get(Get get, boolean withCoprocessor) throws IOException { List results = new ArrayList(); @@ -6709,27 +6487,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return stats.build(); } - /** - * Performs atomic multiple reads and writes on a given row. - * - * @param processor The object defines the reads and writes to a row. - * @param nonceGroup Optional nonce group of the operation (client Id) - * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence") - */ + @Override + public void processRowsWithLocks(RowProcessor processor) throws IOException { + processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE, + HConstants.NO_NONCE); + } + + @Override public void processRowsWithLocks(RowProcessor processor, long nonceGroup, long nonce) throws IOException { processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce); } - /** - * Performs atomic multiple reads and writes on a given row. - * - * @param processor The object defines the reads and writes to a row. - * @param timeout The timeout of the processor.process() execution - * Use a negative number to switch off the time bound - * @param nonceGroup Optional nonce group of the operation (client Id) - * @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence") - */ + @Override public void processRowsWithLocks(RowProcessor processor, long timeout, long nonceGroup, long nonce) throws IOException { @@ -6942,14 +6712,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // TODO: There's a lot of boiler plate code identical to increment. // We should refactor append and increment as local get-mutate-put // transactions, so all stores only go through one code path for puts. - /** - * Perform one or more append operations on a row. - * - * @return new keyvalues after increment - * @throws IOException - */ - public Result append(Append append, long nonceGroup, long nonce) - throws IOException { + + @Override + public Result append(Append append, long nonceGroup, long nonce) throws IOException { byte[] row = append.getRow(); checkRow(row, "append"); boolean flush = false; @@ -7208,11 +6973,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // TODO: There's a lot of boiler plate code identical to append. // We should refactor append and increment as local get-mutate-put // transactions, so all stores only go through one code path for puts. - /** - * Perform one or more increment operations on a row. - * @return new keyvalues after increment - * @throws IOException - */ + + @Override public Result increment(Increment increment, long nonceGroup, long nonce) throws IOException { byte [] row = increment.getRow(); @@ -7510,22 +7272,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // System.exit(1); } - /** - * Registers a new protocol buffer {@link Service} subclass as a coprocessor endpoint to - * be available for handling - * {@link HRegion#execService(com.google.protobuf.RpcController, - * org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall)}} calls. - * - *

- * Only a single instance may be registered per region for a given {@link Service} subclass (the - * instances are keyed on {@link com.google.protobuf.Descriptors.ServiceDescriptor#getFullName()}. - * After the first registration, subsequent calls with the same service name will fail with - * a return value of {@code false}. - *

- * @param instance the {@code Service} subclass instance to expose as a coprocessor endpoint - * @return {@code true} if the registration was successful, {@code false} - * otherwise - */ + @Override public boolean registerService(Service instance) { /* * No stacking of instances is allowed for a single service name @@ -7540,26 +7287,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance); if (LOG.isDebugEnabled()) { - LOG.debug("Registered coprocessor service: region="+ - Bytes.toStringBinary(getRegionName())+" service="+serviceDesc.getFullName()); + LOG.debug("Registered coprocessor service: region=" + + Bytes.toStringBinary(getRegionInfo().getRegionName()) + + " service=" + serviceDesc.getFullName()); } return true; } - /** - * Executes a single protocol buffer coprocessor endpoint {@link Service} method using - * the registered protocol handlers. {@link Service} implementations must be registered via the - * {@link HRegion#registerService(com.google.protobuf.Service)} - * method before they are available. - * - * @param controller an {@code RpcController} implementation to pass to the invoked service - * @param call a {@code CoprocessorServiceCall} instance identifying the service, method, - * and parameters for the method invocation - * @return a protocol buffer {@code Message} instance containing the method's result - * @throws IOException if no registered service handler is found or an error - * occurs during the invocation - * @see org.apache.hadoop.hbase.regionserver.HRegion#registerService(com.google.protobuf.Service) - */ + @Override public Message execService(RpcController controller, CoprocessorServiceCall call) throws IOException { String serviceName = call.getServiceName(); @@ -7567,7 +7302,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (!coprocessorServiceHandlers.containsKey(serviceName)) { throw new UnknownProtocolException(null, "No registered coprocessor service found for name "+serviceName+ - " in region "+Bytes.toStringBinary(getRegionName())); + " in region "+Bytes.toStringBinary(getRegionInfo().getRegionName())); } Service service = coprocessorServiceHandlers.get(serviceName); @@ -7576,7 +7311,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (methodDesc == null) { throw new UnknownProtocolException(service.getClass(), "Unknown method "+methodName+" called on service "+serviceName+ - " in region "+Bytes.toStringBinary(getRegionName())); + " in region "+Bytes.toStringBinary(getRegionInfo().getRegionName())); } Message request = service.getRequestPrototype(methodDesc).newBuilderForType() @@ -7627,7 +7362,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // try { region.initialize(null); if (majorCompact) { - region.compactStores(true); + region.compact(true); } else { // Default behavior Scan scan = new Scan(); @@ -7740,22 +7475,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // this.coprocessorHost = coprocessorHost; } - /** - * This method needs to be called before any public call that reads or - * modifies data. It has to be called just before a try. - * #closeRegionOperation needs to be called in the try's finally block - * Acquires a read lock and checks if the region is closing or closed. - * @throws IOException - */ + @Override public void startRegionOperation() throws IOException { startRegionOperation(Operation.ANY); } - /** - * @param op The operation is about to be taken on the region - * @throws IOException - */ - protected void startRegionOperation(Operation op) throws IOException { + @Override + public void startRegionOperation(Operation op) throws IOException { switch (op) { case GET: // read operations case SCAN: @@ -7771,7 +7497,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // when a region is in recovering state, no read, split or merge is allowed if (isRecovering() && (this.disallowWritesInRecovering || (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) { - throw new RegionInRecoveryException(this.getRegionNameAsString() + + throw new RegionInRecoveryException(getRegionInfo().getRegionNameAsString() + " is recovering; cannot take reads"); } break; @@ -7785,12 +7511,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return; } if (this.closing.get()) { - throw new NotServingRegionException(getRegionNameAsString() + " is closing"); + throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing"); } lock(lock.readLock()); if (this.closed.get()) { lock.readLock().unlock(); - throw new NotServingRegionException(getRegionNameAsString() + " is closed"); + throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed"); } try { if (coprocessorHost != null) { @@ -7802,11 +7528,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - /** - * Closes the lock. This needs to be called in the finally block corresponding - * to the try block of #startRegionOperation - * @throws IOException - */ + @Override public void closeRegionOperation() throws IOException { closeRegionOperation(Operation.ANY); } @@ -7835,14 +7557,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // private void startBulkRegionOperation(boolean writeLockNeeded) throws NotServingRegionException, RegionTooBusyException, InterruptedIOException { if (this.closing.get()) { - throw new NotServingRegionException(getRegionNameAsString() + " is closing"); + throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing"); } if (writeLockNeeded) lock(lock.writeLock()); else lock(lock.readLock()); if (this.closed.get()) { if (writeLockNeeded) lock.writeLock().unlock(); else lock.readLock().unlock(); - throw new NotServingRegionException(getRegionNameAsString() + " is closed"); + throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed"); } } @@ -8015,30 +7737,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - /** - * Gets the latest sequence number that was read from storage when this region was opened. - */ + @Override public long getOpenSeqNum() { return this.openSeqNum; } - /** - * Gets max sequence ids of stores that was read from storage when this region was opened. WAL - * Edits with smaller or equal sequence number will be skipped from replay. - */ - public Map getMaxStoreSeqIdForLogReplay() { + @Override + public Map getMaxStoreSeqId() { return this.maxSeqIdInStores; } - @VisibleForTesting + @Override public long getOldestSeqIdOfStore(byte[] familyName) { return wal.getEarliestMemstoreSeqNum(getRegionInfo() .getEncodedNameAsBytes(), familyName); } - /** - * @return if a given region is in compaction now. - */ + @Override public CompactionState getCompactionState() { boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0; return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR) @@ -8077,39 +7792,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // this.sequenceId.set(value); } - /** - * Listener class to enable callers of - * bulkLoadHFile() to perform any necessary - * pre/post processing of a given bulkload call - */ - public interface BulkLoadListener { - - /** - * Called before an HFile is actually loaded - * @param family family being loaded to - * @param srcPath path of HFile - * @return final path to be used for actual loading - * @throws IOException - */ - String prepareBulkLoad(byte[] family, String srcPath) throws IOException; - - /** - * Called after a successful HFile load - * @param family family being loaded to - * @param srcPath path of HFile - * @throws IOException - */ - void doneBulkLoad(byte[] family, String srcPath) throws IOException; - - /** - * Called after a failed HFile load - * @param family family being loaded to - * @param srcPath path of HFile - * @throws IOException - */ - void failedBulkLoad(byte[] family, String srcPath) throws IOException; - } - @VisibleForTesting class RowLockContext { private final HashedBytes row; private final CountDownLatch latch = new CountDownLatch(1); @@ -8127,7 +7809,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // RowLock newLock() { lockCount++; - return new RowLock(this); + RowLockImpl rl = new RowLockImpl(); + rl.setContext(this); + return rl; } void releaseLock() { @@ -8148,29 +7832,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - /** - * Row lock held by a given thread. - * One thread may acquire multiple locks on the same row simultaneously. - * The locks must be released by calling release() from the same thread. - */ - public static class RowLock { - @VisibleForTesting final RowLockContext context; + public static class RowLockImpl implements RowLock { + private RowLockContext context; private boolean released = false; - @VisibleForTesting RowLock(RowLockContext context) { + @VisibleForTesting + public RowLockContext getContext() { + return context; + } + + @VisibleForTesting + public void setContext(RowLockContext context) { this.context = context; } - /** - * Release the given lock. If there are no remaining locks held by the current thread - * then unlock the row and allow other threads to acquire the lock. - * @throws IllegalArgumentException if called by a different thread than the lock owning thread - */ + @Override public void release() { if (!released) { context.releaseLock(); - released = true; } + released = true; } } @@ -8195,16 +7876,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } /** - * Explicitly sync wal - * @throws IOException - */ - public void syncWal() throws IOException { - if(this.wal != null) { - this.wal.sync(); - } - } - - /** * {@inheritDoc} */ @Override