Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BD68510D90 for ; Fri, 19 Dec 2014 00:06:22 +0000 (UTC) Received: (qmail 67462 invoked by uid 500); 19 Dec 2014 00:06:20 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 67381 invoked by uid 500); 19 Dec 2014 00:06:20 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 67338 invoked by uid 99); 19 Dec 2014 00:06:20 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Dec 2014 00:06:20 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id F11029CAFE0; Fri, 19 Dec 2014 00:06:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stack@apache.org To: commits@hbase.apache.org Date: Fri, 19 Dec 2014 00:06:19 -0000 Message-Id: <51121ea9858f44759e0a5584c18272a3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] hbase git commit: HBASE-10201 Port 'Make flush decisions per column family' to trunk Repository: hbase Updated Branches: refs/heads/branch-1 9895604e9 -> 5d34d2d02 http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 43072ce..e40b48d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -31,10 +33,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -50,7 +54,6 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -65,15 +68,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; -import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER; -import org.apache.hadoop.hbase.wal.DefaultWALProvider; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALPrettyPrinter; -import org.apache.hadoop.hbase.wal.WALProvider.Writer; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.DrainBarrier; @@ -81,6 +76,13 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALPrettyPrinter; +import org.apache.hadoop.hbase.wal.WALProvider.Writer; +import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.util.StringUtils; import org.htrace.NullScope; @@ -89,6 +91,7 @@ import org.htrace.Trace; import org.htrace.TraceScope; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.ExceptionHandler; @@ -334,33 +337,35 @@ public class FSHLog implements WAL { // sequence id numbers are by region and unrelated to the ring buffer sequence number accounting // done above in failedSequence, highest sequence, etc. /** - * This lock ties all operations on oldestFlushingRegionSequenceIds and - * oldestFlushedRegionSequenceIds Maps with the exception of append's putIfAbsent call into - * oldestUnflushedSeqNums. We use these Maps to find out the low bound regions sequence id, or - * to find regions with old sequence ids to force flush; we are interested in old stuff not the - * new additions (TODO: IS THIS SAFE? CHECK!). + * This lock ties all operations on lowestFlushingStoreSequenceIds and + * oldestUnflushedStoreSequenceIds Maps with the exception of append's putIfAbsent call into + * oldestUnflushedStoreSequenceIds. We use these Maps to find out the low bound regions + * sequence id, or to find regions with old sequence ids to force flush; we are interested in + * old stuff not the new additions (TODO: IS THIS SAFE? CHECK!). */ private final Object regionSequenceIdLock = new Object(); /** - * Map of encoded region names to their OLDEST -- i.e. their first, the longest-lived -- - * sequence id in memstore. Note that this sequence id is the region sequence id. This is not - * related to the id we use above for {@link #highestSyncedSequence} and - * {@link #highestUnsyncedSequence} which is the sequence from the disruptor ring buffer. + * Map of encoded region names and family names to their OLDEST -- i.e. their first, + * the longest-lived -- sequence id in memstore. Note that this sequence id is the region + * sequence id. This is not related to the id we use above for {@link #highestSyncedSequence} + * and {@link #highestUnsyncedSequence} which is the sequence from the disruptor + * ring buffer. */ - private final ConcurrentSkipListMap oldestUnflushedRegionSequenceIds = - new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + private final ConcurrentMap> oldestUnflushedStoreSequenceIds + = new ConcurrentSkipListMap>( + Bytes.BYTES_COMPARATOR); /** - * Map of encoded region names to their lowest or OLDEST sequence/edit id in memstore currently - * being flushed out to hfiles. Entries are moved here from - * {@link #oldestUnflushedRegionSequenceIds} while the lock {@link #regionSequenceIdLock} is held + * Map of encoded region names and family names to their lowest or OLDEST sequence/edit id in + * memstore currently being flushed out to hfiles. Entries are moved here from + * {@link #oldestUnflushedStoreSequenceIds} while the lock {@link #regionSequenceIdLock} is held * (so movement between the Maps is atomic). This is not related to the id we use above for * {@link #highestSyncedSequence} and {@link #highestUnsyncedSequence} which is the sequence from * the disruptor ring buffer, an internal detail. */ - private final Map lowestFlushingRegionSequenceIds = - new TreeMap(Bytes.BYTES_COMPARATOR); + private final Map> lowestFlushingStoreSequenceIds = + new TreeMap>(Bytes.BYTES_COMPARATOR); /** * Map of region encoded names to the latest region sequence id. Updated on each append of @@ -735,6 +740,28 @@ public class FSHLog implements WAL { return DefaultWALProvider.createWriter(conf, fs, path, false); } + private long getLowestSeqId(Map seqIdMap) { + long result = HConstants.NO_SEQNUM; + for (Long seqNum: seqIdMap.values()) { + if (result == HConstants.NO_SEQNUM || seqNum.longValue() < result) { + result = seqNum.longValue(); + } + } + return result; + } + + private > Map copyMapWithLowestSeqId( + Map mapToCopy) { + Map copied = Maps.newHashMap(); + for (Map.Entry entry: mapToCopy.entrySet()) { + long lowestSeqId = getLowestSeqId(entry.getValue()); + if (lowestSeqId != HConstants.NO_SEQNUM) { + copied.put(entry.getKey(), lowestSeqId); + } + } + return copied; + } + /** * Archive old logs that could be archived: a log is eligible for archiving if all its WALEdits * have been flushed to hfiles. @@ -747,22 +774,23 @@ public class FSHLog implements WAL { * @throws IOException */ private void cleanOldLogs() throws IOException { - Map oldestFlushingSeqNumsLocal = null; - Map oldestUnflushedSeqNumsLocal = null; + Map lowestFlushingRegionSequenceIdsLocal = null; + Map oldestUnflushedRegionSequenceIdsLocal = null; List logsToArchive = new ArrayList(); // make a local copy so as to avoid locking when we iterate over these maps. synchronized (regionSequenceIdLock) { - oldestFlushingSeqNumsLocal = new HashMap(this.lowestFlushingRegionSequenceIds); - oldestUnflushedSeqNumsLocal = - new HashMap(this.oldestUnflushedRegionSequenceIds); + lowestFlushingRegionSequenceIdsLocal = + copyMapWithLowestSeqId(this.lowestFlushingStoreSequenceIds); + oldestUnflushedRegionSequenceIdsLocal = + copyMapWithLowestSeqId(this.oldestUnflushedStoreSequenceIds); } for (Map.Entry> e : byWalRegionSequenceIds.entrySet()) { // iterate over the log file. Path log = e.getKey(); Map sequenceNums = e.getValue(); // iterate over the map for this log file, and tell whether it should be archive or not. - if (areAllRegionsFlushed(sequenceNums, oldestFlushingSeqNumsLocal, - oldestUnflushedSeqNumsLocal)) { + if (areAllRegionsFlushed(sequenceNums, lowestFlushingRegionSequenceIdsLocal, + oldestUnflushedRegionSequenceIdsLocal)) { logsToArchive.add(log); LOG.debug("WAL file ready for archiving " + log); } @@ -816,10 +844,11 @@ public class FSHLog implements WAL { List regionsToFlush = null; // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock. synchronized (regionSequenceIdLock) { - for (Map.Entry e : regionsSequenceNums.entrySet()) { - Long unFlushedVal = this.oldestUnflushedRegionSequenceIds.get(e.getKey()); - if (unFlushedVal != null && unFlushedVal <= e.getValue()) { - if (regionsToFlush == null) regionsToFlush = new ArrayList(); + for (Map.Entry e: regionsSequenceNums.entrySet()) { + long unFlushedVal = getEarliestMemstoreSeqNum(e.getKey()); + if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) { + if (regionsToFlush == null) + regionsToFlush = new ArrayList(); regionsToFlush.add(e.getKey()); } } @@ -1585,36 +1614,53 @@ public class FSHLog implements WAL { // +1 for current use log return getNumRolledLogFiles() + 1; } - + // public only until class moves to o.a.h.h.wal /** @return the size of log files in use */ public long getLogFileSize() { return this.totalLogSize.get(); } - + @Override - public boolean startCacheFlush(final byte[] encodedRegionName) { - Long oldRegionSeqNum = null; + public boolean startCacheFlush(final byte[] encodedRegionName, + Set flushedFamilyNames) { + Map oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); if (!closeBarrier.beginOp()) { LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) + " - because the server is closing."); return false; } synchronized (regionSequenceIdLock) { - oldRegionSeqNum = this.oldestUnflushedRegionSequenceIds.remove(encodedRegionName); - if (oldRegionSeqNum != null) { - Long oldValue = - this.lowestFlushingRegionSequenceIds.put(encodedRegionName, oldRegionSeqNum); - assert oldValue == - null : "Flushing map not cleaned up for " + Bytes.toString(encodedRegionName); + ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = + oldestUnflushedStoreSequenceIds.get(encodedRegionName); + if (oldestUnflushedStoreSequenceIdsOfRegion != null) { + for (byte[] familyName: flushedFamilyNames) { + Long seqId = oldestUnflushedStoreSequenceIdsOfRegion.remove(familyName); + if (seqId != null) { + oldStoreSeqNum.put(familyName, seqId); + } + } + if (!oldStoreSeqNum.isEmpty()) { + Map oldValue = this.lowestFlushingStoreSequenceIds.put( + encodedRegionName, oldStoreSeqNum); + assert oldValue == null: "Flushing map not cleaned up for " + + Bytes.toString(encodedRegionName); + } + if (oldestUnflushedStoreSequenceIdsOfRegion.isEmpty()) { + // Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever + // even if the region is already moved to other server. + // Do not worry about data racing, we held write lock of region when calling + // startCacheFlush, so no one can add value to the map we removed. + oldestUnflushedStoreSequenceIds.remove(encodedRegionName); + } } } - if (oldRegionSeqNum == null) { - // TODO: if we have no oldRegionSeqNum, and WAL is not disabled, presumably either - // the region is already flushing (which would make this call invalid), or there - // were no appends after last flush, so why are we starting flush? Maybe we should - // assert not null, and switch to "long" everywhere. Less rigorous, but safer, - // alternative is telling the caller to stop. For now preserve old logic. + if (oldStoreSeqNum.isEmpty()) { + // TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either + // the region is already flushing (which would make this call invalid), or there + // were no appends after last flush, so why are we starting flush? Maybe we should + // assert not empty. Less rigorous, but safer, alternative is telling the caller to stop. + // For now preserve old logic. LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: [" + Bytes.toString(encodedRegionName) + "]"); } @@ -1624,30 +1670,59 @@ public class FSHLog implements WAL { @Override public void completeCacheFlush(final byte [] encodedRegionName) { synchronized (regionSequenceIdLock) { - this.lowestFlushingRegionSequenceIds.remove(encodedRegionName); + this.lowestFlushingStoreSequenceIds.remove(encodedRegionName); } closeBarrier.endOp(); } + private ConcurrentMap getOrCreateOldestUnflushedStoreSequenceIdsOfRegion( + byte[] encodedRegionName) { + ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = + oldestUnflushedStoreSequenceIds.get(encodedRegionName); + if (oldestUnflushedStoreSequenceIdsOfRegion != null) { + return oldestUnflushedStoreSequenceIdsOfRegion; + } + oldestUnflushedStoreSequenceIdsOfRegion = + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + ConcurrentMap alreadyPut = + oldestUnflushedStoreSequenceIds.put(encodedRegionName, + oldestUnflushedStoreSequenceIdsOfRegion); + return alreadyPut == null ? oldestUnflushedStoreSequenceIdsOfRegion : alreadyPut; + } + @Override public void abortCacheFlush(byte[] encodedRegionName) { - Long currentSeqNum = null, seqNumBeforeFlushStarts = null; + Map storeSeqNumsBeforeFlushStarts; + Map currentStoreSeqNums = new TreeMap(Bytes.BYTES_COMPARATOR); synchronized (regionSequenceIdLock) { - seqNumBeforeFlushStarts = this.lowestFlushingRegionSequenceIds.remove(encodedRegionName); - if (seqNumBeforeFlushStarts != null) { - currentSeqNum = - this.oldestUnflushedRegionSequenceIds.put(encodedRegionName, seqNumBeforeFlushStarts); + storeSeqNumsBeforeFlushStarts = this.lowestFlushingStoreSequenceIds.remove( + encodedRegionName); + if (storeSeqNumsBeforeFlushStarts != null) { + ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = + getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName); + for (Map.Entry familyNameAndSeqId: storeSeqNumsBeforeFlushStarts + .entrySet()) { + currentStoreSeqNums.put(familyNameAndSeqId.getKey(), + oldestUnflushedStoreSequenceIdsOfRegion.put(familyNameAndSeqId.getKey(), + familyNameAndSeqId.getValue())); + } } } closeBarrier.endOp(); - if ((currentSeqNum != null) - && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) { - String errorStr = "Region " + Bytes.toString(encodedRegionName) + - "acquired edits out of order current memstore seq=" + currentSeqNum - + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts; - LOG.error(errorStr); - assert false : errorStr; - Runtime.getRuntime().halt(1); + if (storeSeqNumsBeforeFlushStarts != null) { + for (Map.Entry familyNameAndSeqId : storeSeqNumsBeforeFlushStarts.entrySet()) { + Long currentSeqNum = currentStoreSeqNums.get(familyNameAndSeqId.getKey()); + if (currentSeqNum != null + && currentSeqNum.longValue() <= familyNameAndSeqId.getValue().longValue()) { + String errorStr = + "Region " + Bytes.toString(encodedRegionName) + " family " + + Bytes.toString(familyNameAndSeqId.getKey()) + + " acquired edits out of order current memstore seq=" + currentSeqNum + + ", previous oldest unflushed id=" + familyNameAndSeqId.getValue(); + LOG.error(errorStr); + Runtime.getRuntime().halt(1); + } + } } } @@ -1678,8 +1753,23 @@ public class FSHLog implements WAL { @Override public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) { - Long result = oldestUnflushedRegionSequenceIds.get(encodedRegionName); - return result == null ? HConstants.NO_SEQNUM : result.longValue(); + ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = + this.oldestUnflushedStoreSequenceIds.get(encodedRegionName); + return oldestUnflushedStoreSequenceIdsOfRegion != null ? + getLowestSeqId(oldestUnflushedStoreSequenceIdsOfRegion) : HConstants.NO_SEQNUM; + } + + @Override + public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, + byte[] familyName) { + ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = + this.oldestUnflushedStoreSequenceIds.get(encodedRegionName); + if (oldestUnflushedStoreSequenceIdsOfRegion != null) { + Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName); + return result != null ? result.longValue() : HConstants.NO_SEQNUM; + } else { + return HConstants.NO_SEQNUM; + } } /** @@ -1915,6 +2005,15 @@ public class FSHLog implements WAL { } } + private void updateOldestUnflushedSequenceIds(byte[] encodedRegionName, + Set familyNameSet, Long lRegionSequenceId) { + ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = + getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName); + for (byte[] familyName : familyNameSet) { + oldestUnflushedStoreSequenceIdsOfRegion.putIfAbsent(familyName, lRegionSequenceId); + } + } + /** * Append to the WAL. Does all CP and WAL listener calls. * @param entry @@ -1962,9 +2061,10 @@ public class FSHLog implements WAL { Long lRegionSequenceId = Long.valueOf(regionSequenceId); highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId); if (entry.isInMemstore()) { - oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId); + updateOldestUnflushedSequenceIds(encodedRegionName, + entry.getFamilyNames(), lRegionSequenceId); } - + coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit()); // Update metrics. postAppend(entry, EnvironmentEdgeManager.currentTime() - start); http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index d9942b3..147a13d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -19,13 +19,21 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CollectionUtils; + +import com.google.common.collect.Sets; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALKey; @@ -96,7 +104,7 @@ class FSWALEntry extends Entry { */ long stampRegionSequenceId() throws IOException { long regionSequenceId = this.regionSequenceIdReference.incrementAndGet(); - if (!this.getEdit().isReplay() && memstoreCells != null && !memstoreCells.isEmpty()) { + if (!this.getEdit().isReplay() && !CollectionUtils.isEmpty(memstoreCells)) { for (Cell cell : this.memstoreCells) { CellUtil.setSequenceId(cell, regionSequenceId); } @@ -105,4 +113,21 @@ class FSWALEntry extends Entry { key.setLogSeqNum(regionSequenceId); return regionSequenceId; } + + /** + * @return the family names which are effected by this edit. + */ + Set getFamilyNames() { + ArrayList cells = this.getEdit().getCells(); + if (CollectionUtils.isEmpty(cells)) { + return Collections.emptySet(); + } + Set familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR); + for (Cell cell : cells) { + if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { + familySet.add(CellUtil.cloneFamily(cell)); + } + } + return familySet; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 70bfff1..1fbe151 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -183,7 +184,7 @@ class DisabledWALProvider implements WALProvider { } @Override - public boolean startCacheFlush(final byte[] encodedRegionName) { + public boolean startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames) { return !(closed.get()); } @@ -206,6 +207,11 @@ class DisabledWALProvider implements WALProvider { } @Override + public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) { + return HConstants.NO_SEQNUM; + } + + @Override public String toString() { return "WAL disabled."; } http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 23f8c9f..5a2b08d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.wal; import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -152,7 +153,7 @@ public interface WAL { * @return true if the flush can proceed, false in case wal is closing (ususally, when server is * closing) and flush couldn't be started. */ - boolean startCacheFlush(final byte[] encodedRegionName); + boolean startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames); /** * Complete the cache flush. @@ -182,6 +183,14 @@ public interface WAL { long getEarliestMemstoreSeqNum(byte[] encodedRegionName); /** + * Gets the earliest sequence number in the memstore for this particular region and store. + * @param encodedRegionName The region to get the number for. + * @param familyName The family to get the number for. + * @return The number if present, HConstants.NO_SEQNUM if absent. + */ + long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName); + + /** * Human readable identifying information about the state of this WAL. * Implementors are encouraged to include information appropriate for debugging. * Consumers are advised not to rely on the details of the returned String; it does http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index 48feb03..dd37e24 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -266,7 +266,7 @@ public class TestIOFencing { compactingRegion = (CompactionBlockerRegion)testRegions.get(0); LOG.info("Blocking compactions"); compactingRegion.stopCompactions(); - long lastFlushTime = compactingRegion.getLastFlushTime(); + long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores(); // Load some rows TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); @@ -282,7 +282,7 @@ public class TestIOFencing { // Wait till flush has happened, otherwise there won't be multiple store files long startWaitTime = System.currentTimeMillis(); - while (compactingRegion.getLastFlushTime() <= lastFlushTime || + while (compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime || compactingRegion.countStoreFiles() <= 1) { LOG.info("Waiting for the region to flush " + compactingRegion.getRegionNameAsString()); Thread.sleep(1000); http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java index 00bf09b..edb4926 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java @@ -33,8 +33,8 @@ public class TestFlushRegionEntry { @Test public void test() { - FlushRegionEntry entry = new FlushRegionEntry(Mockito.mock(HRegion.class)); - FlushRegionEntry other = new FlushRegionEntry(Mockito.mock(HRegion.class)); + FlushRegionEntry entry = new FlushRegionEntry(Mockito.mock(HRegion.class), true); + FlushRegionEntry other = new FlushRegionEntry(Mockito.mock(HRegion.class), true); assertEquals(entry.hashCode(), other.hashCode()); assertEquals(entry, other); http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index 2d63775..e69e735 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -112,11 +112,11 @@ public class TestHeapMemoryManager { long oldBlockCacheSize = blockCache.maxSize; heapMemoryManager.start(); memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK; - memStoreFlusher.requestFlush(null); - memStoreFlusher.requestFlush(null); - memStoreFlusher.requestFlush(null); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; - memStoreFlusher.requestFlush(null); + memStoreFlusher.requestFlush(null, false); Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); @@ -126,8 +126,8 @@ public class TestHeapMemoryManager { oldBlockCacheSize = blockCache.maxSize; // Do some more flushes before the next run of HeapMemoryTuner memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK; - memStoreFlusher.requestFlush(null); - memStoreFlusher.requestFlush(null); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); Thread.sleep(1500); assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); @@ -407,12 +407,12 @@ public class TestHeapMemoryManager { } @Override - public void requestFlush(HRegion region) { + public void requestFlush(HRegion region, boolean forceFlushAllStores) { this.listener.flushRequested(flushType, region); } @Override - public void requestDelayedFlush(HRegion region, long delay) { + public void requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores) { } http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java new file mode 100644 index 0000000..43a9575 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -0,0 +1,644 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; +import org.apache.hadoop.hbase.regionserver.DefaultMemStore; +import org.apache.hadoop.hbase.regionserver.FlushAllStoresPolicy; +import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy; +import org.apache.hadoop.hbase.regionserver.FlushPolicy; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.hash.Hashing; + +/** + * This test verifies the correctness of the Per Column Family flushing strategy + */ +@Category(MediumTests.class) +public class TestPerColumnFamilyFlush { + private static final Log LOG = LogFactory.getLog(TestPerColumnFamilyFlush.class); + + HRegion region = null; + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); + + public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1"); + + public static final byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), + Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") }; + + public static final byte[] FAMILY1 = families[0]; + + public static final byte[] FAMILY2 = families[1]; + + public static final byte[] FAMILY3 = families[2]; + + private void initHRegion(String callingMethod, Configuration conf) throws IOException { + HTableDescriptor htd = new HTableDescriptor(TABLENAME); + for (byte[] family : families) { + htd.addFamily(new HColumnDescriptor(family)); + } + HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false); + Path path = new Path(DIR, callingMethod); + region = HRegion.createHRegion(info, path, conf, htd); + } + + // A helper function to create puts. + private Put createPut(int familyNum, int putNum) { + byte[] qf = Bytes.toBytes("q" + familyNum); + byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); + byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); + Put p = new Put(row); + p.add(families[familyNum - 1], qf, val); + return p; + } + + // A helper function to create puts. + private Get createGet(int familyNum, int putNum) { + byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); + return new Get(row); + } + + // A helper function to verify edits. + void verifyEdit(int familyNum, int putNum, HTable table) throws IOException { + Result r = table.get(createGet(familyNum, putNum)); + byte[] family = families[familyNum - 1]; + byte[] qf = Bytes.toBytes("q" + familyNum); + byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); + assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family)); + assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), + r.getFamilyMap(family).get(qf)); + assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum), + Arrays.equals(r.getFamilyMap(family).get(qf), val)); + } + + @Test + public void testSelectiveFlushWhenEnabled() throws IOException { + // Set up the configuration + Configuration conf = HBaseConfiguration.create(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, + FlushLargeStoresPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, + 100 * 1024); + // Intialize the HRegion + initHRegion("testSelectiveFlushWhenEnabled", conf); + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 + for (int i = 1; i <= 1200; i++) { + region.put(createPut(1, i)); + + if (i <= 100) { + region.put(createPut(2, i)); + if (i <= 50) { + region.put(createPut(3, i)); + } + } + } + + long totalMemstoreSize = region.getMemstoreSize().get(); + + // Find the smallest LSNs for edits wrt to each CF. + long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3); + + // Find the sizes of the memstores of each CF. + long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + + // Get the overall smallest LSN in the region's memstores. + long smallestSeqInRegionCurrentMemstore = + region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + + // The overall smallest LSN in the region's memstores should be the same as + // the LSN of the smallest edit in CF1 + assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore); + + // Some other sanity checks. + assertTrue(smallestSeqCF1 < smallestSeqCF2); + assertTrue(smallestSeqCF2 < smallestSeqCF3); + assertTrue(cf1MemstoreSize > 0); + assertTrue(cf2MemstoreSize > 0); + assertTrue(cf3MemstoreSize > 0); + + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. + assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize + + cf2MemstoreSize + cf3MemstoreSize); + + // Flush! + region.flushcache(false); + + // Will use these to check if anything changed. + long oldCF2MemstoreSize = cf2MemstoreSize; + long oldCF3MemstoreSize = cf3MemstoreSize; + + // Recalculate everything + cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + totalMemstoreSize = region.getMemstoreSize().get(); + smallestSeqInRegionCurrentMemstore = + region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + + // We should have cleared out only CF1, since we chose the flush thresholds + // and number of puts accordingly. + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); + // Nothing should have happened to CF2, ... + assertEquals(cf2MemstoreSize, oldCF2MemstoreSize); + // ... or CF3 + assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); + // Now the smallest LSN in the region should be the same as the smallest + // LSN in the memstore of CF2. + assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2); + // Of course, this should hold too. + assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize + + cf3MemstoreSize); + + // Now add more puts (mostly for CF2), so that we only flush CF2 this time. + for (int i = 1200; i < 2400; i++) { + region.put(createPut(2, i)); + + // Add only 100 puts for CF3 + if (i - 1200 < 100) { + region.put(createPut(3, i)); + } + } + + // How much does the CF3 memstore occupy? Will be used later. + oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + + // Flush again + region.flushcache(false); + + // Recalculate everything + cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + totalMemstoreSize = region.getMemstoreSize().get(); + smallestSeqInRegionCurrentMemstore = + region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + + // CF1 and CF2, both should be absent. + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize); + // CF3 shouldn't have been touched. + assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); + assertEquals(totalMemstoreSize + DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize); + assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3); + + // What happens when we hit the memstore limit, but we are not able to find + // any Column Family above the threshold? + // In that case, we should flush all the CFs. + + // Clearing the existing memstores. + region.flushcache(true); + + // The memstore limit is 200*1024 and the column family flush threshold is + // around 50*1024. We try to just hit the memstore limit with each CF's + // memstore being below the CF flush threshold. + for (int i = 1; i <= 300; i++) { + region.put(createPut(1, i)); + region.put(createPut(2, i)); + region.put(createPut(3, i)); + region.put(createPut(4, i)); + region.put(createPut(5, i)); + } + + region.flushcache(false); + // Since we won't find any CF above the threshold, and hence no specific + // store to flush, we should flush all the memstores. + assertEquals(0, region.getMemstoreSize().get()); + } + + @Test + public void testSelectiveFlushWhenNotEnabled() throws IOException { + // Set up the configuration + Configuration conf = HBaseConfiguration.create(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName()); + + // Intialize the HRegion + initHRegion("testSelectiveFlushWhenNotEnabled", conf); + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 + for (int i = 1; i <= 1200; i++) { + region.put(createPut(1, i)); + if (i <= 100) { + region.put(createPut(2, i)); + if (i <= 50) { + region.put(createPut(3, i)); + } + } + } + + long totalMemstoreSize = region.getMemstoreSize().get(); + + // Find the sizes of the memstores of each CF. + long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + + // Some other sanity checks. + assertTrue(cf1MemstoreSize > 0); + assertTrue(cf2MemstoreSize > 0); + assertTrue(cf3MemstoreSize > 0); + + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. + assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize + + cf2MemstoreSize + cf3MemstoreSize); + + // Flush! + region.flushcache(false); + + cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + totalMemstoreSize = region.getMemstoreSize().get(); + long smallestSeqInRegionCurrentMemstore = + region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + + // Everything should have been cleared + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize); + assertEquals(0, totalMemstoreSize); + assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore); + } + + // Find the (first) region which has the specified name. + private static Pair getRegionWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (HRegion region : hrs.getOnlineRegions(tableName)) { + return Pair.newPair(region, hrs); + } + } + return null; + } + + @Test + public void testLogReplay() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000); + // Carefully chosen limits so that the memstore just flushes when we're done + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, + FlushLargeStoresPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 10000); + final int numRegionServers = 4; + TEST_UTIL.startMiniCluster(numRegionServers); + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + HTable table = TEST_UTIL.createTable(TABLENAME, families); + HTableDescriptor htd = table.getTableDescriptor(); + + for (byte[] family : families) { + if (!htd.hasFamily(family)) { + htd.addFamily(new HColumnDescriptor(family)); + } + } + + // Add 100 edits for CF1, 20 for CF2, 20 for CF3. + // These will all be interleaved in the log. + for (int i = 1; i <= 80; i++) { + table.put(createPut(1, i)); + if (i <= 10) { + table.put(createPut(2, i)); + table.put(createPut(3, i)); + } + } + table.flushCommits(); + Thread.sleep(1000); + + Pair desiredRegionAndServer = getRegionWithName(TABLENAME); + HRegion desiredRegion = desiredRegionAndServer.getFirst(); + assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); + + // Flush the region selectively. + desiredRegion.flushcache(false); + + long totalMemstoreSize; + long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize; + totalMemstoreSize = desiredRegion.getMemstoreSize().get(); + + // Find the sizes of the memstores of each CF. + cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize(); + + // CF1 Should have been flushed + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); + // CF2 and CF3 shouldn't have been flushed. + assertTrue(cf2MemstoreSize > 0); + assertTrue(cf3MemstoreSize > 0); + assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize + + cf3MemstoreSize); + + // Wait for the RS report to go across to the master, so that the master + // is aware of which sequence ids have been flushed, before we kill the RS. + // If in production, the RS dies before the report goes across, we will + // safely replay all the edits. + Thread.sleep(2000); + + // Abort the region server where we have the region hosted. + HRegionServer rs = desiredRegionAndServer.getSecond(); + rs.abort("testing"); + + // The aborted region server's regions will be eventually assigned to some + // other region server, and the get RPC call (inside verifyEdit()) will + // retry for some time till the regions come back up. + + // Verify that all the edits are safe. + for (int i = 1; i <= 80; i++) { + verifyEdit(1, i, table); + if (i <= 10) { + verifyEdit(2, i, table); + verifyEdit(3, i, table); + } + } + + TEST_UTIL.shutdownMiniCluster(); + } + + // Test Log Replay with Distributed Replay on. + // In distributed log replay, the log splitters ask the master for the + // last flushed sequence id for a region. This test would ensure that we + // are doing the book-keeping correctly. + @Test + public void testLogReplayWithDistributedReplay() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + testLogReplay(); + } + + /** + * When a log roll is about to happen, we do a flush of the regions who will be affected by the + * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This + * test ensures that we do a full-flush in that scenario. + * @throws IOException + */ + @Test + public void testFlushingWhenLogRolling() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300000); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, + FlushLargeStoresPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 100000); + + // Also, let us try real hard to get a log roll to happen. + // Keeping the log roll period to 2s. + conf.setLong("hbase.regionserver.logroll.period", 2000); + // Keep the block size small so that we fill up the log files very fast. + conf.setLong("hbase.regionserver.hlog.blocksize", 6144); + int maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); + + final int numRegionServers = 4; + TEST_UTIL.startMiniCluster(numRegionServers); + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + HTable table = TEST_UTIL.createTable(TABLENAME, families); + HTableDescriptor htd = table.getTableDescriptor(); + + for (byte[] family : families) { + if (!htd.hasFamily(family)) { + htd.addFamily(new HColumnDescriptor(family)); + } + } + + HRegion desiredRegion = getRegionWithName(TABLENAME).getFirst(); + assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); + + // Add some edits. Most will be for CF1, some for CF2 and CF3. + for (int i = 1; i <= 10000; i++) { + table.put(createPut(1, i)); + if (i <= 200) { + table.put(createPut(2, i)); + table.put(createPut(3, i)); + } + table.flushCommits(); + // Keep adding until we exceed the number of log files, so that we are + // able to trigger the cleaning of old log files. + int currentNumLogFiles = ((FSHLog) (desiredRegion.getWAL())).getNumLogFiles(); + if (currentNumLogFiles > maxLogs) { + LOG.info("The number of log files is now: " + currentNumLogFiles + + ". Expect a log roll and memstore flush."); + break; + } + } + table.close(); + // Wait for some time till the flush caused by log rolling happens. + Thread.sleep(4000); + + // We have artificially created the conditions for a log roll. When a + // log roll happens, we should flush all the column families. Testing that + // case here. + + // Individual families should have been flushed. + assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY1).getMemStoreSize()); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY2).getMemStoreSize()); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY3).getMemStoreSize()); + + // And of course, the total memstore should also be clean. + assertEquals(0, desiredRegion.getMemstoreSize().get()); + + TEST_UTIL.shutdownMiniCluster(); + } + + private void doPut(HTableInterface table) throws IOException { + // cf1 4B per row, cf2 40B per row and cf3 400B per row + byte[] qf = Bytes.toBytes("qf"); + Random rand = new Random(); + byte[] value1 = new byte[100]; + byte[] value2 = new byte[200]; + byte[] value3 = new byte[400]; + for (int i = 0; i < 10000; i++) { + Put put = new Put(Bytes.toBytes("row-" + i)); + rand.setSeed(i); + rand.nextBytes(value1); + rand.nextBytes(value2); + rand.nextBytes(value3); + put.add(FAMILY1, qf, value1); + put.add(FAMILY2, qf, value2); + put.add(FAMILY3, qf, value3); + table.put(put); + } + } + + // Under the same write load, small stores should have less store files when + // percolumnfamilyflush enabled. + @Test + public void testCompareStoreFileCount() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, + 400 * 1024); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + ConstantSizeRegionSplitPolicy.class.getName()); + + HTableDescriptor htd = new HTableDescriptor(TABLENAME); + htd.setCompactionEnabled(false); + htd.addFamily(new HColumnDescriptor(FAMILY1)); + htd.addFamily(new HColumnDescriptor(FAMILY2)); + htd.addFamily(new HColumnDescriptor(FAMILY3)); + + LOG.info("==============Test with selective flush disabled==============="); + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + TEST_UTIL.getHBaseAdmin().createTable(htd); + getRegionWithName(TABLENAME).getFirst(); + HConnection conn = HConnectionManager.createConnection(conf); + HTableInterface table = conn.getTable(TABLENAME); + doPut(table); + table.close(); + conn.close(); + + HRegion region = getRegionWithName(TABLENAME).getFirst(); + int cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount(); + int cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount(); + int cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount(); + TEST_UTIL.shutdownMiniCluster(); + + LOG.info("==============Test with selective flush enabled==============="); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, + FlushLargeStoresPolicy.class.getName()); + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + TEST_UTIL.getHBaseAdmin().createTable(htd); + conn = HConnectionManager.createConnection(conf); + table = conn.getTable(TABLENAME); + doPut(table); + table.close(); + conn.close(); + + region = getRegionWithName(TABLENAME).getFirst(); + int cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount(); + int cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount(); + int cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount(); + TEST_UTIL.shutdownMiniCluster(); + + LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount + + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", " + + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount); + LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1 + + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", " + + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount1); + // small CF will have less store files. + assertTrue(cf1StoreFileCount1 < cf1StoreFileCount); + assertTrue(cf2StoreFileCount1 < cf2StoreFileCount); + } + + public static void main(String[] args) throws Exception { + int numRegions = Integer.parseInt(args[0]); + long numRows = Long.parseLong(args[1]); + + HTableDescriptor htd = new HTableDescriptor(TABLENAME); + htd.setMaxFileSize(10L * 1024 * 1024 * 1024); + htd.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName()); + htd.addFamily(new HColumnDescriptor(FAMILY1)); + htd.addFamily(new HColumnDescriptor(FAMILY2)); + htd.addFamily(new HColumnDescriptor(FAMILY3)); + + Configuration conf = HBaseConfiguration.create(); + HConnection conn = HConnectionManager.createConnection(conf); + HBaseAdmin admin = new HBaseAdmin(conn); + if (admin.tableExists(TABLENAME)) { + admin.disableTable(TABLENAME); + admin.deleteTable(TABLENAME); + } + if (numRegions >= 3) { + byte[] startKey = new byte[16]; + byte[] endKey = new byte[16]; + Arrays.fill(endKey, (byte) 0xFF); + admin.createTable(htd, startKey, endKey, numRegions); + } else { + admin.createTable(htd); + } + admin.close(); + + HTableInterface table = conn.getTable(TABLENAME); + byte[] qf = Bytes.toBytes("qf"); + Random rand = new Random(); + byte[] value1 = new byte[16]; + byte[] value2 = new byte[256]; + byte[] value3 = new byte[4096]; + for (long i = 0; i < numRows; i++) { + Put put = new Put(Hashing.md5().hashLong(i).asBytes()); + rand.setSeed(i); + rand.nextBytes(value1); + rand.nextBytes(value2); + rand.nextBytes(value3); + put.add(FAMILY1, qf, value1); + put.add(FAMILY2, qf, value2); + put.add(FAMILY3, qf, value3); + table.put(put); + if (i % 10000 == 0) { + LOG.info(i + " rows put"); + } + } + table.close(); + conn.close(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index 669060c..8c152fd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -30,6 +30,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; @@ -42,8 +43,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.Coprocessor; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -152,18 +153,15 @@ public class TestFSHLog { } } - protected void addEdits(WAL log, HRegionInfo hri, TableName tableName, - int times, AtomicLong sequenceId) throws IOException { - HTableDescriptor htd = new HTableDescriptor(); - htd.addFamily(new HColumnDescriptor("row")); - - final byte [] row = Bytes.toBytes("row"); + protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times, + AtomicLong sequenceId) throws IOException { + final byte[] row = Bytes.toBytes("row"); for (int i = 0; i < times; i++) { long timestamp = System.currentTimeMillis(); WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); - log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, timestamp), cols, - sequenceId, true, null); + log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp), + cols, sequenceId, true, null); } log.sync(); } @@ -173,8 +171,8 @@ public class TestFSHLog { * @param wal * @param regionEncodedName */ - protected void flushRegion(WAL wal, byte[] regionEncodedName) { - wal.startCacheFlush(regionEncodedName); + protected void flushRegion(WAL wal, byte[] regionEncodedName, Set flushedFamilyNames) { + wal.startCacheFlush(regionEncodedName, flushedFamilyNames); wal.completeCacheFlush(regionEncodedName); } @@ -248,10 +246,14 @@ public class TestFSHLog { conf1.setInt("hbase.regionserver.maxlogs", 1); FSHLog wal = new FSHLog(fs, FSUtils.getRootDir(conf1), dir.toString(), HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null); - TableName t1 = TableName.valueOf("t1"); - TableName t2 = TableName.valueOf("t2"); - HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); - HRegionInfo hri2 = new HRegionInfo(t2, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + HTableDescriptor t1 = + new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row")); + HTableDescriptor t2 = + new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row")); + HRegionInfo hri1 = + new HRegionInfo(t1.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + HRegionInfo hri2 = + new HRegionInfo(t2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); // variables to mock region sequenceIds final AtomicLong sequenceId1 = new AtomicLong(1); final AtomicLong sequenceId2 = new AtomicLong(1); @@ -278,12 +280,12 @@ public class TestFSHLog { assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]); // flush region 1, and roll the wal file. Only last wal which has entries for region1 should // remain. - flushRegion(wal, hri1.getEncodedNameAsBytes()); + flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys()); wal.rollWriter(); // only one wal should remain now (that is for the second region). assertEquals(1, wal.getNumRolledLogFiles()); // flush the second region - flushRegion(wal, hri2.getEncodedNameAsBytes()); + flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys()); wal.rollWriter(true); // no wal should remain now. assertEquals(0, wal.getNumRolledLogFiles()); @@ -300,14 +302,14 @@ public class TestFSHLog { regionsToFlush = wal.findRegionsToForceFlush(); assertEquals(2, regionsToFlush.length); // flush both regions - flushRegion(wal, hri1.getEncodedNameAsBytes()); - flushRegion(wal, hri2.getEncodedNameAsBytes()); + flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys()); + flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys()); wal.rollWriter(true); assertEquals(0, wal.getNumRolledLogFiles()); // Add an edit to region1, and roll the wal. addEdits(wal, hri1, t1, 2, sequenceId1); // tests partial flush: roll on a partial flush, and ensure that wal is not archived. - wal.startCacheFlush(hri1.getEncodedNameAsBytes()); + wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys()); wal.rollWriter(); wal.completeCacheFlush(hri1.getEncodedNameAsBytes()); assertEquals(1, wal.getNumRolledLogFiles()); http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 3f551e4..b6e7c02 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -27,7 +27,10 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -786,13 +789,15 @@ public class TestWALReplay { // Add 1k to each family. final int countPerFamily = 1000; + Set familyNames = new HashSet(); for (HColumnDescriptor hcd: htd.getFamilies()) { addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal, htd, sequenceId); + familyNames.add(hcd.getName()); } // Add a cache flush, shouldn't have any effect - wal.startCacheFlush(regionName); + wal.startCacheFlush(regionName, familyNames); wal.completeCacheFlush(regionName); // Add an edit to another family, should be skipped. @@ -832,11 +837,11 @@ public class TestWALReplay { final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { @Override - protected FlushResult internalFlushcache( - final WAL wal, final long myseqid, MonitoredTask status) + protected FlushResult internalFlushcache(final WAL wal, final long myseqid, + Collection storesToFlush, MonitoredTask status) throws IOException { LOG.info("InternalFlushCache Invoked"); - FlushResult fs = super.internalFlushcache(wal, myseqid, + FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush, Mockito.mock(MonitoredTask.class)); flushcount.incrementAndGet(); return fs; @@ -958,16 +963,16 @@ public class TestWALReplay { private HRegion r; @Override - public void requestFlush(HRegion region) { + public void requestFlush(HRegion region, boolean forceFlushAllStores) { try { - r.flushcache(); + r.flushcache(forceFlushAllStores); } catch (IOException e) { throw new RuntimeException("Exception flushing", e); } } @Override - public void requestDelayedFlush(HRegion region, long when) { + public void requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) { // TODO Auto-generated method stub } http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java index 28cd849..2e9d312 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java @@ -146,18 +146,15 @@ public class TestDefaultWALProvider { } - protected void addEdits(WAL log, HRegionInfo hri, TableName tableName, + protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times, AtomicLong sequenceId) throws IOException { - HTableDescriptor htd = new HTableDescriptor(); - htd.addFamily(new HColumnDescriptor("row")); - - final byte [] row = Bytes.toBytes("row"); + final byte[] row = Bytes.toBytes("row"); for (int i = 0; i < times; i++) { long timestamp = System.currentTimeMillis(); WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); - log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), tableName, timestamp), cols, - sequenceId, true, null); + log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp), + cols, sequenceId, true, null); } log.sync(); } @@ -174,8 +171,8 @@ public class TestDefaultWALProvider { * @param wal * @param regionEncodedName */ - protected void flushRegion(WAL wal, byte[] regionEncodedName) { - wal.startCacheFlush(regionEncodedName); + protected void flushRegion(WAL wal, byte[] regionEncodedName, Set flushedFamilyNames) { + wal.startCacheFlush(regionEncodedName, flushedFamilyNames); wal.completeCacheFlush(regionEncodedName); } @@ -184,45 +181,47 @@ public class TestDefaultWALProvider { @Test public void testLogCleaning() throws Exception { LOG.info("testLogCleaning"); - final TableName tableName = - TableName.valueOf("testLogCleaning"); - final TableName tableName2 = - TableName.valueOf("testLogCleaning2"); + final HTableDescriptor htd = + new HTableDescriptor(TableName.valueOf("testLogCleaning")).addFamily(new HColumnDescriptor( + "row")); + final HTableDescriptor htd2 = + new HTableDescriptor(TableName.valueOf("testLogCleaning2")) + .addFamily(new HColumnDescriptor("row")); final Configuration localConf = new Configuration(conf); localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName()); final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName()); final AtomicLong sequenceId = new AtomicLong(1); try { - HRegionInfo hri = new HRegionInfo(tableName, + HRegionInfo hri = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); - HRegionInfo hri2 = new HRegionInfo(tableName2, + HRegionInfo hri2 = new HRegionInfo(htd2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); // we want to mix edits from regions, so pick our own identifier. final WAL log = wals.getWAL(UNSPECIFIED_REGION); // Add a single edit and make sure that rolling won't remove the file // Before HBASE-3198 it used to delete it - addEdits(log, hri, tableName, 1, sequenceId); + addEdits(log, hri, htd, 1, sequenceId); log.rollWriter(); assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(log)); // See if there's anything wrong with more than 1 edit - addEdits(log, hri, tableName, 2, sequenceId); + addEdits(log, hri, htd, 2, sequenceId); log.rollWriter(); assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log)); // Now mix edits from 2 regions, still no flushing - addEdits(log, hri, tableName, 1, sequenceId); - addEdits(log, hri2, tableName2, 1, sequenceId); - addEdits(log, hri, tableName, 1, sequenceId); - addEdits(log, hri2, tableName2, 1, sequenceId); + addEdits(log, hri, htd, 1, sequenceId); + addEdits(log, hri2, htd2, 1, sequenceId); + addEdits(log, hri, htd, 1, sequenceId); + addEdits(log, hri2, htd2, 1, sequenceId); log.rollWriter(); assertEquals(3, DefaultWALProvider.getNumRolledLogFiles(log)); // Flush the first region, we expect to see the first two files getting // archived. We need to append something or writer won't be rolled. - addEdits(log, hri2, tableName2, 1, sequenceId); - log.startCacheFlush(hri.getEncodedNameAsBytes()); + addEdits(log, hri2, htd2, 1, sequenceId); + log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.rollWriter(); assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log)); @@ -230,8 +229,8 @@ public class TestDefaultWALProvider { // Flush the second region, which removes all the remaining output files // since the oldest was completely flushed and the two others only contain // flush information - addEdits(log, hri2, tableName2, 1, sequenceId); - log.startCacheFlush(hri2.getEncodedNameAsBytes()); + addEdits(log, hri2, htd2, 1, sequenceId); + log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getFamiliesKeys()); log.completeCacheFlush(hri2.getEncodedNameAsBytes()); log.rollWriter(); assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(log)); @@ -254,21 +253,25 @@ public class TestDefaultWALProvider { *

* @throws IOException */ - @Test + @Test public void testWALArchiving() throws IOException { LOG.debug("testWALArchiving"); - TableName table1 = TableName.valueOf("t1"); - TableName table2 = TableName.valueOf("t2"); + HTableDescriptor table1 = + new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row")); + HTableDescriptor table2 = + new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row")); final Configuration localConf = new Configuration(conf); localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName()); final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName()); try { final WAL wal = wals.getWAL(UNSPECIFIED_REGION); assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal)); - HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW, - HConstants.EMPTY_END_ROW); - HRegionInfo hri2 = new HRegionInfo(table2, HConstants.EMPTY_START_ROW, - HConstants.EMPTY_END_ROW); + HRegionInfo hri1 = + new HRegionInfo(table1.getTableName(), HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); + HRegionInfo hri2 = + new HRegionInfo(table2.getTableName(), HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); // ensure that we don't split the regions. hri1.setSplit(false); hri2.setSplit(false); @@ -287,7 +290,7 @@ public class TestDefaultWALProvider { assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal)); // add a waledit to table1, and flush the region. addEdits(wal, hri1, table1, 3, sequenceId1); - flushRegion(wal, hri1.getEncodedNameAsBytes()); + flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getFamiliesKeys()); // roll log; all old logs should be archived. wal.rollWriter(); assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal)); @@ -301,7 +304,7 @@ public class TestDefaultWALProvider { assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal)); // add edits for table2, and flush hri1. addEdits(wal, hri2, table2, 2, sequenceId2); - flushRegion(wal, hri1.getEncodedNameAsBytes()); + flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getFamiliesKeys()); // the log : region-sequenceId map is // log1: region2 (unflushed) // log2: region1 (flushed) @@ -311,7 +314,7 @@ public class TestDefaultWALProvider { assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal)); // flush region2, and all logs should be archived. addEdits(wal, hri2, table2, 2, sequenceId2); - flushRegion(wal, hri2.getEncodedNameAsBytes()); + flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getFamiliesKeys()); wal.rollWriter(); assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal)); } finally { http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index b163bd5..1530b6b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -479,8 +479,9 @@ public class TestWALFactory { @Test public void testEditAdd() throws IOException { final int COL_COUNT = 10; - final TableName tableName = - TableName.valueOf("tablename"); + final HTableDescriptor htd = + new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor( + "column")); final byte [] row = Bytes.toBytes("row"); WAL.Reader reader = null; try { @@ -495,16 +496,15 @@ public class TestWALFactory { Bytes.toBytes(Integer.toString(i)), timestamp, new byte[] { (byte)(i + '0') })); } - HRegionInfo info = new HRegionInfo(tableName, + HRegionInfo info = new HRegionInfo(htd.getTableName(), row,Bytes.toBytes(Bytes.toString(row) + "1"), false); - HTableDescriptor htd = new HTableDescriptor(); - htd.addFamily(new HColumnDescriptor("column")); final WAL log = wals.getWAL(info.getEncodedNameAsBytes()); - final long txid = log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), cols, sequenceId, true, null); + final long txid = log.append(htd, info, + new WALKey(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()), + cols, sequenceId, true, null); log.sync(txid); - log.startCacheFlush(info.getEncodedNameAsBytes()); + log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getFamiliesKeys()); log.completeCacheFlush(info.getEncodedNameAsBytes()); log.shutdown(); Path filename = DefaultWALProvider.getCurrentFileName(log); @@ -518,7 +518,7 @@ public class TestWALFactory { WALKey key = entry.getKey(); WALEdit val = entry.getEdit(); assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName())); - assertTrue(tableName.equals(key.getTablename())); + assertTrue(htd.getTableName().equals(key.getTablename())); Cell cell = val.getCells().get(0); assertTrue(Bytes.equals(row, cell.getRow())); assertEquals((byte)(i + '0'), cell.getValue()[0]); @@ -537,8 +537,9 @@ public class TestWALFactory { @Test public void testAppend() throws IOException { final int COL_COUNT = 10; - final TableName tableName = - TableName.valueOf("tablename"); + final HTableDescriptor htd = + new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor( + "column")); final byte [] row = Bytes.toBytes("row"); WAL.Reader reader = null; final AtomicLong sequenceId = new AtomicLong(1); @@ -552,15 +553,14 @@ public class TestWALFactory { Bytes.toBytes(Integer.toString(i)), timestamp, new byte[] { (byte)(i + '0') })); } - HRegionInfo hri = new HRegionInfo(tableName, + HRegionInfo hri = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); - HTableDescriptor htd = new HTableDescriptor(); - htd.addFamily(new HColumnDescriptor("column")); final WAL log = wals.getWAL(hri.getEncodedNameAsBytes()); - final long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), cols, sequenceId, true, null); + final long txid = log.append(htd, hri, + new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()), + cols, sequenceId, true, null); log.sync(txid); - log.startCacheFlush(hri.getEncodedNameAsBytes()); + log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.shutdown(); Path filename = DefaultWALProvider.getCurrentFileName(log); @@ -572,7 +572,7 @@ public class TestWALFactory { for (Cell val : entry.getEdit().getCells()) { assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName())); - assertTrue(tableName.equals(entry.getKey().getTablename())); + assertTrue(htd.getTableName().equals(entry.getKey().getTablename())); assertTrue(Bytes.equals(row, val.getRow())); assertEquals((byte)(idx + '0'), val.getValue()[0]); System.out.println(entry.getKey() + " " + val);