hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeffr...@apache.org
Subject hbase git commit: HBASE-12600: Remove REPLAY tag dependency in Distributed Replay Mode
Date Mon, 01 Dec 2014 19:36:54 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 86dc3cab9 -> a184620fb


HBASE-12600: Remove REPLAY tag dependency in Distributed Replay Mode


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a184620f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a184620f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a184620f

Branch: refs/heads/branch-1
Commit: a184620fb384b43d48266ae6faa2fe2554945446
Parents: 86dc3ca
Author: Jeffrey Zhong <jeffreyz@apache.org>
Authored: Mon Dec 1 10:15:50 2014 -0800
Committer: Jeffrey Zhong <jeffreyz@apache.org>
Committed: Mon Dec 1 11:30:13 2014 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/CellComparator.java | 29 ---------------
 .../java/org/apache/hadoop/hbase/KeyValue.java  |  9 ++---
 .../java/org/apache/hadoop/hbase/TagType.java   |  2 +-
 .../ZKSplitLogManagerCoordination.java          | 10 +++---
 .../hadoop/hbase/regionserver/HRegion.java      | 12 +++++--
 .../hbase/regionserver/RSRpcServices.java       |  4 +--
 .../compactions/DefaultCompactor.java           |  3 +-
 .../compactions/StripeCompactor.java            |  4 +--
 .../hbase/regionserver/wal/HLogSplitter.java    | 37 ++------------------
 .../master/TestDistributedLogSplitting.java     |  4 +--
 10 files changed, 27 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a184620f/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
index 8093217..3ad717b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
@@ -68,19 +68,6 @@ public class CellComparator implements Comparator<Cell>, Serializable
{
 
     if (!ignoreSequenceid) {
       // Negate following comparisons so later edits show up first
-
-      // compare log replay tag value if there is any
-      // when either keyvalue tagged with log replay sequence number, we need to compare
them:
-      // 1) when both keyvalues have the tag, then use the tag values for comparison
-      // 2) when one has and the other doesn't have, the one without the log
-      // replay tag wins because
-      // it means the edit isn't from recovery but new one coming from clients during recovery
-      // 3) when both doesn't have, then skip to the next mvcc comparison
-      long leftChangeSeqNum = getReplaySeqNum(a);
-      long RightChangeSeqNum = getReplaySeqNum(b);
-      if (leftChangeSeqNum != Long.MAX_VALUE || RightChangeSeqNum != Long.MAX_VALUE) {
-        return Longs.compare(RightChangeSeqNum, leftChangeSeqNum);
-      }
       // mvccVersion: later sorts first
       return Longs.compare(b.getMvccVersion(), a.getMvccVersion());
     } else {
@@ -88,22 +75,6 @@ public class CellComparator implements Comparator<Cell>, Serializable
{
     }
   }
 
-  /**
-   * Return replay log sequence number for the cell
-   *
-   * @param c
-   * @return Long.MAX_VALUE if there is no LOG_REPLAY_TAG
-   */
-  private static long getReplaySeqNum(final Cell c) {
-    Tag tag = Tag.getTag(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength(),
-        TagType.LOG_REPLAY_TAG_TYPE);
-
-    if (tag != null) {
-      return Bytes.toLong(tag.getBuffer(), tag.getTagOffset(), tag.getTagLength());
-    }
-    return Long.MAX_VALUE;
-  }
-
   public static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix)
{
     return findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength()
         - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset()

http://git-wip-us.apache.org/repos/asf/hbase/blob/a184620f/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index 0857f12..f6cb93d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -746,6 +746,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
         c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), 
         c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),

         c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
+    this.seqId = c.getSequenceId();
   }
 
   /**
@@ -1020,8 +1021,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
     checkForTagsLength(tagsLength);
     // Allocate right-sized byte array.
     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
-    byte [] bytes =
-        new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength, tagsLength)];
+    byte[] bytes = new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength,
vlength,
+      tagsLength)];
     // Write key, value and key row length.
     int pos = 0;
     pos = Bytes.putInt(bytes, pos, keyLength);
@@ -2551,8 +2552,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
      * Compare two keys assuming that the first n bytes are the same.
      * @param commonPrefix How many bytes are the same.
      */
-    int compareIgnoringPrefix(
-      int commonPrefix, byte[] left, int loffset, int llength, byte[] right, int roffset,
int rlength
+    int compareIgnoringPrefix(int commonPrefix, byte[] left, int loffset, int llength,
+        byte[] right, int roffset, int rlength
     );
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a184620f/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index b113516..f088dcf 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -26,7 +26,7 @@ public final class TagType {
   // Please declare new Tag Types here to avoid step on pre-existing tag types.
   public static final byte ACL_TAG_TYPE = (byte) 1;
   public static final byte VISIBILITY_TAG_TYPE = (byte) 2;
-  public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3;
+  // public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3; // deprecated
   public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
   // String based tag type used in replication
   public static final byte STRING_VIS_TAG_TYPE = (byte) 7;

http://git-wip-us.apache.org/repos/asf/hbase/blob/a184620f/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
index 0f8baa3..a9ab9c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
@@ -774,8 +774,8 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
   public void setRecoveryMode(boolean isForInitialization) throws IOException {
     synchronized(this) {
       if (this.isDrainingDone) {
-        // when there is no outstanding splitlogtask after master start up, we already have
up to date
-        // recovery mode
+        // when there is no outstanding splitlogtask after master start up, we already have
up to 
+        // date recovery mode
         return;
       }
     }
@@ -867,12 +867,10 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener
implements
     boolean dlr =
         conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
           HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
-    int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" +
version);
+      LOG.debug("Distributed log replay=" + dlr);
     }
-    // For distributed log replay, hfile version must be 3 at least; we need tag support.
-    return dlr && (version >= 3);
+    return dlr;
   }
 
   private boolean resubmit(ServerName serverName, String path, int version) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a184620f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 95bed0e..288825f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2688,7 +2688,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver
{ //
           continue;
         }
         doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote
-        addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells);
+        addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay);
       }
 
       // ------------------------------------
@@ -3191,12 +3191,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver
{ //
    * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction.
    *        If null, then this method internally creates a mvcc transaction.
    * @param output newly added KVs into memstore
+   * @param isInReplay true when adding replayed KVs into memstore
    * @return the additional memory usage of the memstore caused by the
    * new entries.
    * @throws IOException
    */
   private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
-    long mvccNum, List<Cell> memstoreCells) throws IOException {
+    long mvccNum, List<Cell> memstoreCells, boolean isInReplay) throws IOException
{
     long size = 0;
 
     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
@@ -3211,6 +3212,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver
{ //
         Pair<Long, Cell> ret = store.add(cell);
         size += ret.getFirst();
         memstoreCells.add(ret.getSecond());
+        if(isInReplay) {
+          // set memstore newly added cells with replay mvcc number
+          CellUtil.setSequenceId(ret.getSecond(), mvccNum);
+        }
       }
     }
 
@@ -3409,7 +3414,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver
{ //
       }
 
       try {
-        // replay the edits. Replay can return -1 if everything is skipped, only update if
seqId is greater
+        // replay the edits. Replay can return -1 if everything is skipped, only update
+        // if seqId is greater
         seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
       } catch (IOException e) {
         boolean skipErrors = conf.getBoolean(

http://git-wip-us.apache.org/repos/asf/hbase/blob/a184620f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 9a26617..7d291bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1445,8 +1445,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         entries.get(0).getKey().getEncodedRegionName().toStringUtf8());
       RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
       List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey,
WALEdit>>();
-      // when tag is enabled, we need tag replay edits with log sequence number
-      boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3);
       for (WALEntry entry : entries) {
         if (regionServer.nonceManager != null) {
           long nonceGroup = entry.getKey().hasNonceGroup()
@@ -1457,7 +1455,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
           new Pair<HLogKey, WALEdit>();
         List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry,
-          cells, walEntry, needAddReplayTag);
+          cells, walEntry);
         if (coprocessorHost != null) {
           // Start coprocessor replay here. The coprocessor is for each WALEdit instead of
a
           // KeyValue.

http://git-wip-us.apache.org/repos/asf/hbase/blob/a184620f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index cc03e09..df76073 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -92,8 +92,9 @@ public class DefaultCompactor extends Compactor {
           smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
           cleanSeqId = true;
         }
+        
         writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
-            fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
+            true, fd.maxTagsLength > 0);
         boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId);
         if (!finished) {
           writer.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/a184620f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 20af77d..4416df1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -115,13 +115,13 @@ public class StripeCompactor extends Compactor {
         smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
         cleanSeqId = true;
       }
-      final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint;
+
       final Compression.Algorithm compression = store.getFamily().getCompactionCompression();
       StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory()
{
         @Override
         public Writer createWriter() throws IOException {
           return store.createWriterInTmp(
-              fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0);
+              fd.maxKeyCount, compression, true, true, fd.maxTagsLength > 0);
         }
       };
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a184620f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index b04ae6f..79f2cbe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -1906,34 +1906,6 @@ public class HLogSplitter {
     public final long nonce;
   }
 
- /**
-  * Tag original sequence number for each edit to be replayed
-  * @param seqId
-  * @param cell
-  */
-  private static Cell tagReplayLogSequenceNumber(long seqId, Cell cell) {
-    // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet
-    boolean needAddRecoveryTag = true;
-    if (cell.getTagsLength() > 0) {
-      Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
-        TagType.LOG_REPLAY_TAG_TYPE);
-      if (tmpTag != null) {
-        // found an existing log replay tag so reuse it
-        needAddRecoveryTag = false;
-      }
-    }
-    if (needAddRecoveryTag) {
-      List<Tag> newTags = new ArrayList<Tag>();
-      Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(seqId));
-      newTags.add(replayTag);
-      if (cell.getTagsLength() > 0) {
-        newTags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()));
-      }
-      return new TagRewriteCell(cell, Tag.fromList(newTags));
-    }
-    return cell;
-  }
-
   /**
    * This function is used to construct mutations from a WALEntry. It also reconstructs HLogKey
&
    * WALEdit from the passed in WALEntry
@@ -1941,12 +1913,11 @@ public class HLogSplitter {
    * @param cells
    * @param logEntry pair of HLogKey and WALEdit instance stores HLogKey and WALEdit instances
    *          extracted from the passed in WALEntry.
-   * @param addLogReplayTag
    * @return list of Pair<MutationType, Mutation> to be replayed
    * @throws IOException
    */
   public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner
cells,
-      Pair<HLogKey, WALEdit> logEntry, boolean addLogReplayTag) throws IOException
{
+      Pair<HLogKey, WALEdit> logEntry) throws IOException {
 
     if (entry == null) {
       // return an empty array
@@ -1993,11 +1964,7 @@ public class HLogSplitter {
       if (CellUtil.isDelete(cell)) {
         ((Delete) m).addDeleteMarker(cell);
       } else {
-        Cell tmpNewCell = cell;
-        if (addLogReplayTag) {
-          tmpNewCell = tagReplayLogSequenceNumber(replaySeqId, cell);
-        }
-        ((Put) m).add(tmpNewCell);
+        ((Put) m).add(cell);
       }
       previousCell = cell;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a184620f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index 1ad7bc8..cd2b752 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -1186,7 +1186,6 @@ public class TestDistributedLogSplitting {
     LOG.info("testSameVersionUpdatesRecovery");
     conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
-    conf.setInt("hfile.format.version", 3);
     startCluster(NUM_RS);
     final AtomicLong sequenceId = new AtomicLong(100);
     final int NUM_REGIONS_TO_CREATE = 40;
@@ -1278,11 +1277,10 @@ public class TestDistributedLogSplitting {
     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
     conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024);
     conf.setInt("hbase.hstore.compactionThreshold", 3);
-    conf.setInt("hfile.format.version", 3);
     startCluster(NUM_RS);
     final AtomicLong sequenceId = new AtomicLong(100);
     final int NUM_REGIONS_TO_CREATE = 40;
-    final int NUM_LOG_LINES = 1000;
+    final int NUM_LOG_LINES = 2000;
     // turn off load balancing to prevent regions from moving around otherwise
     // they will consume recovered.edits
     master.balanceSwitch(false);


Mime
View raw message