hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject git commit: HBASE-11475 Distributed log replay should also replay compaction events
Date Thu, 10 Jul 2014 17:50:42 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 95ef3acdd -> 09fcdb50a


HBASE-11475 Distributed log replay should also replay compaction events


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/09fcdb50
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/09fcdb50
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/09fcdb50

Branch: refs/heads/master
Commit: 09fcdb50af38e0e25a5bdab95b34577bb4d38f49
Parents: 95ef3ac
Author: Enis Soztutar <enis@apache.org>
Authored: Thu Jul 10 10:50:22 2014 -0700
Committer: Enis Soztutar <enis@apache.org>
Committed: Thu Jul 10 10:50:22 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |   1 +
 .../hbase/protobuf/generated/WALProtos.java     | 138 ++++++++++++++++++-
 hbase-protocol/src/main/protobuf/WAL.proto      |   1 +
 .../hadoop/hbase/regionserver/HRegion.java      |  10 +-
 .../hbase/regionserver/RSRpcServices.java       |  29 +++-
 .../hbase/regionserver/wal/HLogSplitter.java    |  53 +++++--
 .../hadoop/hbase/regionserver/wal/HLogUtil.java |   2 +-
 .../hadoop/hbase/regionserver/wal/WALEdit.java  |  26 +++-
 .../regionserver/wal/WALEditsReplaySink.java    |   3 -
 .../org/apache/hadoop/hbase/TestIOFencing.java  |  48 +++++--
 .../hadoop/hbase/regionserver/TestHRegion.java  |   7 +-
 .../replication/TestReplicationSmallTests.java  |   7 +-
 12 files changed, 270 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/09fcdb50/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index c58d1e5..7277429 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -2495,6 +2495,7 @@ public final class ProtobufUtil {
     for (Path outputPath : outputPaths) {
       builder.addCompactionOutput(outputPath.getName());
     }
+    builder.setRegionName(HBaseZeroCopyByteString.wrap(info.getRegionName()));
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/09fcdb50/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index efea2ba..c6d6a19 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -3725,6 +3725,24 @@ public final class WALProtos {
      */
     com.google.protobuf.ByteString
         getStoreHomeDirBytes();
+
+    // optional bytes region_name = 7;
+    /**
+     * <code>optional bytes region_name = 7;</code>
+     *
+     * <pre>
+     * full region name
+     * </pre>
+     */
+    boolean hasRegionName();
+    /**
+     * <code>optional bytes region_name = 7;</code>
+     *
+     * <pre>
+     * full region name
+     * </pre>
+     */
+    com.google.protobuf.ByteString getRegionName();
   }
   /**
    * Protobuf type {@code CompactionDescriptor}
@@ -3821,6 +3839,11 @@ public final class WALProtos {
               storeHomeDir_ = input.readBytes();
               break;
             }
+            case 58: {
+              bitField0_ |= 0x00000010;
+              regionName_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4026,6 +4049,30 @@ public final class WALProtos {
       }
     }
 
+    // optional bytes region_name = 7;
+    public static final int REGION_NAME_FIELD_NUMBER = 7;
+    private com.google.protobuf.ByteString regionName_;
+    /**
+     * <code>optional bytes region_name = 7;</code>
+     *
+     * <pre>
+     * full region name
+     * </pre>
+     */
+    public boolean hasRegionName() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional bytes region_name = 7;</code>
+     *
+     * <pre>
+     * full region name
+     * </pre>
+     */
+    public com.google.protobuf.ByteString getRegionName() {
+      return regionName_;
+    }
+
     private void initFields() {
       tableName_ = com.google.protobuf.ByteString.EMPTY;
       encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
@@ -4033,6 +4080,7 @@ public final class WALProtos {
       compactionInput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
       compactionOutput_ = com.google.protobuf.LazyStringArrayList.EMPTY;
       storeHomeDir_ = "";
+      regionName_ = com.google.protobuf.ByteString.EMPTY;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4080,6 +4128,9 @@ public final class WALProtos {
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
         output.writeBytes(6, getStoreHomeDirBytes());
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeBytes(7, regionName_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -4123,6 +4174,10 @@ public final class WALProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(6, getStoreHomeDirBytes());
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(7, regionName_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -4170,6 +4225,11 @@ public final class WALProtos {
         result = result && getStoreHomeDir()
             .equals(other.getStoreHomeDir());
       }
+      result = result && (hasRegionName() == other.hasRegionName());
+      if (hasRegionName()) {
+        result = result && getRegionName()
+            .equals(other.getRegionName());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -4207,6 +4267,10 @@ public final class WALProtos {
         hash = (37 * hash) + STORE_HOME_DIR_FIELD_NUMBER;
         hash = (53 * hash) + getStoreHomeDir().hashCode();
       }
+      if (hasRegionName()) {
+        hash = (37 * hash) + REGION_NAME_FIELD_NUMBER;
+        hash = (53 * hash) + getRegionName().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -4336,6 +4400,8 @@ public final class WALProtos {
         bitField0_ = (bitField0_ & ~0x00000010);
         storeHomeDir_ = "";
         bitField0_ = (bitField0_ & ~0x00000020);
+        regionName_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
 
@@ -4392,6 +4458,10 @@ public final class WALProtos {
           to_bitField0_ |= 0x00000008;
         }
         result.storeHomeDir_ = storeHomeDir_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.regionName_ = regionName_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -4442,6 +4512,9 @@ public final class WALProtos {
           storeHomeDir_ = other.storeHomeDir_;
           onChanged();
         }
+        if (other.hasRegionName()) {
+          setRegionName(other.getRegionName());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -4869,6 +4942,58 @@ public final class WALProtos {
         return this;
       }
 
+      // optional bytes region_name = 7;
+      private com.google.protobuf.ByteString regionName_ = com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>optional bytes region_name = 7;</code>
+       *
+       * <pre>
+       * full region name
+       * </pre>
+       */
+      public boolean hasRegionName() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional bytes region_name = 7;</code>
+       *
+       * <pre>
+       * full region name
+       * </pre>
+       */
+      public com.google.protobuf.ByteString getRegionName() {
+        return regionName_;
+      }
+      /**
+       * <code>optional bytes region_name = 7;</code>
+       *
+       * <pre>
+       * full region name
+       * </pre>
+       */
+      public Builder setRegionName(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000040;
+        regionName_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bytes region_name = 7;</code>
+       *
+       * <pre>
+       * full region name
+       * </pre>
+       */
+      public Builder clearRegionName() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        regionName_ = getDefaultInstance().getRegionName();
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:CompactionDescriptor)
     }
 
@@ -5275,15 +5400,16 @@ public final class WALProtos {
       "ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" +
       "\004\022\r\n\005nonce\030\n \001(\004\022\034\n\024orig_sequence_number",
       "\030\013 \001(\004\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\036\n"
+
-      "\nscope_type\030\002 \002(\0162\n.ScopeType\"\251\001\n\024Compac" +
+      "\nscope_type\030\002 \002(\0162\n.ScopeType\"\276\001\n\024Compac" +
       "tionDescriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023en" +
       "coded_region_name\030\002 \002(\014\022\023\n\013family_name\030\003" +
       " \002(\014\022\030\n\020compaction_input\030\004 \003(\t\022\031\n\021compac" +
       "tion_output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(" +
-      "\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027REPLICAT" +
-      "ION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_G" +
-      "LOBAL\020\001B?\n*org.apache.hadoop.hbase.proto" +
-      "buf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
+      "\t\022\023\n\013region_name\030\007 \001(\014\"\014\n\nWALTrailer*F\n\t" +
+      "ScopeType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034" +
+      "\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*org.apac" +
+      "he.hadoop.hbase.protobuf.generatedB\tWALP",
+      "rotosH\001\210\001\000\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5313,7 +5439,7 @@ public final class WALProtos {
           internal_static_CompactionDescriptor_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_CompactionDescriptor_descriptor,
-              new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput",
"CompactionOutput", "StoreHomeDir", });
+              new java.lang.String[] { "TableName", "EncodedRegionName", "FamilyName", "CompactionInput",
"CompactionOutput", "StoreHomeDir", "RegionName", });
           internal_static_WALTrailer_descriptor =
             getDescriptor().getMessageTypes().get(4);
           internal_static_WALTrailer_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/09fcdb50/hbase-protocol/src/main/protobuf/WAL.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto
index 0ae65ec..88e94f4 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -92,6 +92,7 @@ message CompactionDescriptor {
   repeated string compaction_input = 4;
   repeated string compaction_output = 5;
   required string store_home_dir = 6;
+  optional bytes  region_name = 7; // full region name
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/09fcdb50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 93eada8..7c940ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1625,7 +1625,7 @@ public class HRegion implements HeapSize { // , Writable{
    */
   boolean shouldFlush() {
     // This is a rough measure.
-    if (this.lastFlushSeqId > 0 
+    if (this.lastFlushSeqId > 0
           && (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get()))
{
       return true;
     }
@@ -2195,7 +2195,7 @@ public class HRegion implements HeapSize { // , Writable{
     public boolean isInReplay() {
       return false;
     }
-    
+
     @Override
     public long getReplaySequenceId() {
       return 0;
@@ -2234,7 +2234,7 @@ public class HRegion implements HeapSize { // , Writable{
     public boolean isInReplay() {
       return true;
     }
-    
+
     @Override
     public long getReplaySequenceId() {
       return this.replaySeqId;
@@ -3295,7 +3295,7 @@ public class HRegion implements HeapSize { // , Writable{
             firstSeqIdInLog = key.getLogSeqNum();
           }
           currentEditSeqId = key.getLogSeqNum();
-          currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ? 
+          currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
             key.getOrigLogSeqNum() : currentEditSeqId;
           boolean flush = false;
           for (KeyValue kv: val.getKeyValues()) {
@@ -6243,7 +6243,7 @@ public class HRegion implements HeapSize { // , Writable{
       WALEdit.EMPTY_WALEDIT, this.sequenceId, false, cells);
     return key;
   }
-  
+
   /**
    * Explictly sync wal
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hbase/blob/09fcdb50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index b84f9a2..26233b5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -26,8 +26,10 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
@@ -140,6 +142,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
@@ -640,25 +643,37 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    */
   private OperationStatus [] doReplayBatchOp(final HRegion region,
       final List<HLogSplitter.MutationReplay> mutations, long replaySeqId) throws IOException
{
-    HLogSplitter.MutationReplay[] mArray = new HLogSplitter.MutationReplay[mutations.size()];
 
     long before = EnvironmentEdgeManager.currentTimeMillis();
     boolean batchContainsPuts = false, batchContainsDelete = false;
     try {
-      int i = 0;
-      for (HLogSplitter.MutationReplay m : mutations) {
+      for (Iterator<HLogSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();)
{
+        HLogSplitter.MutationReplay m = it.next();
+
         if (m.type == MutationType.PUT) {
           batchContainsPuts = true;
         } else {
           batchContainsDelete = true;
         }
-        mArray[i++] = m;
+
+        NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
+        List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
+        if (metaCells != null && !metaCells.isEmpty()) {
+          for (Cell metaCell : metaCells) {
+            CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
+            if (compactionDesc != null) {
+              region.completeCompactionMarker(compactionDesc);
+            }
+          }
+          it.remove();
+        }
       }
       requestCount.add(mutations.size());
       if (!region.getRegionInfo().isMetaTable()) {
         regionServer.cacheFlusher.reclaimMemStoreMemory();
       }
-      return region.batchReplay(mArray, replaySeqId);
+      return region.batchReplay(mutations.toArray(
+        new HLogSplitter.MutationReplay[mutations.size()]), replaySeqId);
     } finally {
       if (regionServer.metricsRegionServer != null) {
         long after = EnvironmentEdgeManager.currentTimeMillis();
@@ -1355,7 +1370,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           walEntries.add(walEntry);
         }
         if(edits!=null && !edits.isEmpty()) {
-          long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? 
+          long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
             entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
           OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
           // check if it's a partial success
@@ -1366,7 +1381,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           }
         }
       }
-      
+
       //sync wal at the end because ASYNC_WAL is used above
       region.syncWal();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/09fcdb50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index 873e863..de31f24 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -80,7 +80,6 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
 import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -92,6 +91,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRespo
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
@@ -241,7 +241,7 @@ public class HLogSplitter {
     List<Path> splits = new ArrayList<Path>();
     if (logfiles != null && logfiles.length > 0) {
       for (FileStatus logfile: logfiles) {
-        HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null, 
+        HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null,
           RecoveryMode.LOG_SPLITTING);
         if (s.splitLogFile(logfile, null)) {
           finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
@@ -811,6 +811,7 @@ public class HLogSplitter {
       k.internEncodedRegionName(this.encodedRegionName);
     }
 
+    @Override
     public long heapSize() {
       return heapInBuffer;
     }
@@ -825,6 +826,7 @@ public class HLogSplitter {
       outputSink = sink;
     }
 
+    @Override
     public void run()  {
       try {
         doRun();
@@ -1060,6 +1062,7 @@ public class HLogSplitter {
         TimeUnit.SECONDS, new ThreadFactory() {
           private int count = 1;
 
+          @Override
           public Thread newThread(Runnable r) {
             Thread t = new Thread(r, "split-log-closeStream-" + count++);
             return t;
@@ -1070,6 +1073,7 @@ public class HLogSplitter {
       for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
         LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
         completionService.submit(new Callable<Void>() {
+          @Override
           public Void call() throws Exception {
             WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
             LOG.debug("Closing " + wap.p);
@@ -1242,6 +1246,7 @@ public class HLogSplitter {
       return (new WriterAndPath(regionedits, w));
     }
 
+    @Override
     void append(RegionEntryBuffer buffer) throws IOException {
       List<Entry> entries = buffer.entryBuffer;
       if (entries.isEmpty()) {
@@ -1280,6 +1285,7 @@ public class HLogSplitter {
     /**
      * @return a map from encoded region ID to the number of edits written out for that region.
      */
+    @Override
     Map<byte[], Long> getOutputCounts() {
       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
       synchronized (writers) {
@@ -1368,6 +1374,7 @@ public class HLogSplitter {
       this.logRecoveredEditsOutputSink.setReporter(reporter);
     }
 
+    @Override
     void append(RegionEntryBuffer buffer) throws IOException {
       List<Entry> entries = buffer.entryBuffer;
       if (entries.isEmpty()) {
@@ -1449,19 +1456,40 @@ public class HLogSplitter {
         HConnection hconn = this.getConnectionByTableName(table);
 
         for (KeyValue kv : kvs) {
-          // filtering HLog meta entries
-          // We don't handle HBASE-2231 because we may or may not replay a compaction event.
-          // Details at https://issues.apache.org/jira/browse/HBASE-2231?focusedCommentId=13647143&
-          // page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13647143
+          byte[] row = kv.getRow();
+          byte[] family = kv.getFamily();
+          boolean isCompactionEntry = false;
           if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY)) {
-            skippedKVs.add(kv);
-            continue;
+            CompactionDescriptor compaction = WALEdit.getCompaction(kv);
+            if (compaction != null && compaction.hasRegionName()) {
+              try {
+                byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
+                  .toByteArray());
+                row = regionName[1]; // startKey of the region
+                family = compaction.getFamilyName().toByteArray();
+                isCompactionEntry = true;
+              } catch (Exception ex) {
+                LOG.warn("Unexpected exception received, ignoring " + ex);
+                skippedKVs.add(kv);
+                continue;
+              }
+            } else {
+              skippedKVs.add(kv);
+              continue;
+            }
           }
 
           try {
             loc =
-                locateRegionAndRefreshLastFlushedSequenceId(hconn, table, kv.getRow(),
+                locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
                   encodeRegionNameStr);
+            // skip replaying the compaction if the region is gone
+            if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
+              loc.getRegionInfo().getEncodedName())) {
+              LOG.info("Not replaying a compaction marker for an older region: "
+                  + encodeRegionNameStr);
+              needSkip = true;
+            }
           } catch (TableNotFoundException ex) {
             // table has been deleted so skip edits of the table
             LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
@@ -1490,7 +1518,7 @@ public class HLogSplitter {
                   regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
             }
             if (maxStoreSequenceIds != null) {
-              Long maxStoreSeqId = maxStoreSequenceIds.get(kv.getFamily());
+              Long maxStoreSeqId = maxStoreSequenceIds.get(family);
               if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum())
{
                 // skip current kv if column family doesn't exist anymore or already flushed
                 skippedKVs.add(kv);
@@ -1768,6 +1796,7 @@ public class HLogSplitter {
       return result;
     }
 
+    @Override
     Map<byte[], Long> getOutputCounts() {
       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
       synchronized (writers) {
@@ -1922,7 +1951,7 @@ public class HLogSplitter {
       return new ArrayList<MutationReplay>();
     }
 
-    long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? 
+    long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
       entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
     int count = entry.getAssociatedCellCount();
     List<MutationReplay> mutations = new ArrayList<MutationReplay>();
@@ -1979,7 +2008,7 @@ public class HLogSplitter {
         clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
       }
       key = new HLogKey(walKey.getEncodedRegionName().toByteArray(), TableName.valueOf(walKey
-              .getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds,

+              .getTableName().toByteArray()), replaySeqId, walKey.getWriteTime(), clusterIds,
               walKey.getNonceGroup(), walKey.getNonce());
       logEntry.setFirst(key);
       logEntry.setSecond(val);

http://git-wip-us.apache.org/repos/asf/hbase/blob/09fcdb50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
index 6809aad..a0707f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java
@@ -262,7 +262,7 @@ public class HLogUtil {
       final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
     TableName tn = TableName.valueOf(c.getTableName().toByteArray());
     HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
-    log.appendNoSync(htd, info, key, WALEdit.createCompaction(c), sequenceId, false, null);
+    log.appendNoSync(htd, info, key, WALEdit.createCompaction(info, c), sequenceId, false,
null);
     log.sync();
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));

http://git-wip-us.apache.org/repos/asf/hbase/blob/09fcdb50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index 8ecb4b3..85e2d7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.io.HeapSize;
@@ -145,6 +146,7 @@ public class WALEdit implements Writable, HeapSize {
     return result;
   }
 
+  @Override
   public void readFields(DataInput in) throws IOException {
     kvs.clear();
     if (scopes != null) {
@@ -180,6 +182,7 @@ public class WALEdit implements Writable, HeapSize {
     }
   }
 
+  @Override
   public void write(DataOutput out) throws IOException {
     LOG.warn("WALEdit is being serialized to writable - only expected in test code");
     out.writeInt(VERSION_2);
@@ -222,6 +225,7 @@ public class WALEdit implements Writable, HeapSize {
     return kvs.size();
   }
 
+  @Override
   public long heapSize() {
     long ret = ClassSize.ARRAYLIST;
     for (KeyValue kv : kvs) {
@@ -235,6 +239,7 @@ public class WALEdit implements Writable, HeapSize {
     return ret;
   }
 
+  @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
 
@@ -249,25 +254,36 @@ public class WALEdit implements Writable, HeapSize {
     sb.append(">]");
     return sb.toString();
   }
-  
+
   /**
    * Create a compacion WALEdit
    * @param c
    * @return A WALEdit that has <code>c</code> serialized as its value
    */
-  public static WALEdit createCompaction(final CompactionDescriptor c) {
+  public static WALEdit createCompaction(final HRegionInfo hri, final CompactionDescriptor
c) {
     byte [] pbbytes = c.toByteArray();
-    KeyValue kv = new KeyValue(METAROW, METAFAMILY, COMPACTION, System.currentTimeMillis(),
pbbytes);
+    KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION,
+      System.currentTimeMillis(), pbbytes);
     return new WALEdit().add(kv); //replication scope null so that this won't be replicated
   }
 
+  private static byte[] getRowForRegion(HRegionInfo hri) {
+    byte[] startKey = hri.getStartKey();
+    if (startKey.length == 0) {
+      // empty row key is not allowed in mutations because it is both the start key and the
end key
+      // we return the smallest byte[] that is bigger (in lex comparison) than byte[0].
+      return new byte[] {0};
+    }
+    return startKey;
+  }
+
   /**
    * Deserialized and returns a CompactionDescriptor is the KeyValue contains one.
    * @param kv the key value
    * @return deserialized CompactionDescriptor or null.
    */
-  public static CompactionDescriptor getCompaction(KeyValue kv) throws IOException {
-    if (CellUtil.matchingRow(kv, METAROW) && CellUtil.matchingColumn(kv, METAFAMILY,
COMPACTION)) {
+  public static CompactionDescriptor getCompaction(Cell kv) throws IOException {
+    if (CellUtil.matchingColumn(kv, METAFAMILY, COMPACTION)) {
       return CompactionDescriptor.parseFrom(kv.getValue());
     }
     return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/09fcdb50/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index 1c08fac..97e9b86 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -234,8 +233,6 @@ public class WALEditsReplaySink {
         List<KeyValue> kvs = edit.getKeyValues();
         for (KeyValue kv : kvs) {
           // filtering HLog meta entries
-          if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY)) continue;
-
           setLocation(conn.locateRegion(tableName, kv.getRow()));
           skip = true;
           break;

http://git-wip-us.apache.org/repos/asf/hbase/blob/09fcdb50/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index b5848a2..7dec203 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,6 +34,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -42,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hdfs.DFSClient;
@@ -52,6 +56,8 @@ import org.apache.log4j.Level;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Lists;
+
 /**
  * Test for the case where a regionserver going down has enough cycles to do damage to regions
  * that have actually been assigned elsehwere.
@@ -215,7 +221,8 @@ public class TestIOFencing {
    */
   @Test
   public void testFencingAroundCompaction() throws Exception {
-    doTest(BlockCompactionsInPrepRegion.class);
+    doTest(BlockCompactionsInPrepRegion.class, false);
+    doTest(BlockCompactionsInPrepRegion.class, true);
   }
 
   /**
@@ -226,12 +233,13 @@ public class TestIOFencing {
    */
   @Test
   public void testFencingAroundCompactionAfterWALSync() throws Exception {
-    doTest(BlockCompactionsInCompletionRegion.class);
+    doTest(BlockCompactionsInCompletionRegion.class, false);
+    doTest(BlockCompactionsInCompletionRegion.class, true);
   }
 
-  public void doTest(Class<?> regionClass) throws Exception {
+  public void doTest(Class<?> regionClass, boolean distributedLogReplay) throws Exception
{
     Configuration c = TEST_UTIL.getConfiguration();
-    c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
+    c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
     // Insert our custom region
     c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
     c.setBoolean("dfs.support.append", true);
@@ -264,6 +272,16 @@ public class TestIOFencing {
       // Load some rows
       TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
 
+      // add a compaction from an older (non-existing) region to see whether we successfully
skip
+      // those entries
+      HRegionInfo oldHri = new HRegionInfo(table.getName(),
+        HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+      CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
+        FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
+        new Path("store_dir"));
+      HLogUtil.writeCompactionMarker(compactingRegion.getLog(), table.getTableDescriptor(),
+        oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100));
+
       // Wait till flush has happened, otherwise there won't be multiple store files
       long startWaitTime = System.currentTimeMillis();
       while (compactingRegion.getLastFlushTime() <= lastFlushTime ||
@@ -281,18 +299,24 @@ public class TestIOFencing {
       compactingRegion.waitForCompactionToBlock();
       LOG.info("Starting a new server");
       RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
-      HRegionServer newServer = newServerThread.getRegionServer();
+      final HRegionServer newServer = newServerThread.getRegionServer();
       LOG.info("Killing region server ZK lease");
       TEST_UTIL.expireRegionServerSession(0);
       CompactionBlockerRegion newRegion = null;
       startWaitTime = System.currentTimeMillis();
-      while (newRegion == null) {
-        LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
-        Thread.sleep(1000);
-        newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
-        assertTrue("Timed out waiting for new server to open region",
-          System.currentTimeMillis() - startWaitTime < 300000);
-      }
+      LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
+
+      // wait for region to be assigned and to go out of log replay if applicable
+      Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          HRegion newRegion = newServer.getOnlineRegion(REGION_NAME);
+          return newRegion != null && !newRegion.isRecovering();
+        }
+      });
+
+      newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
+
       LOG.info("Allowing compaction to proceed");
       compactingRegion.allowCompactions();
       while (compactingRegion.compactCount == 0) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/09fcdb50/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 8a588e7..15e530a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -657,11 +657,13 @@ public class TestHRegion {
         long time = System.nanoTime();
         WALEdit edit = null;
         if (i == maxSeqId) {
-          edit = WALEdit.createCompaction(CompactionDescriptor.newBuilder()
+          edit = WALEdit.createCompaction(region.getRegionInfo(),
+          CompactionDescriptor.newBuilder()
           .setTableName(ByteString.copyFrom(tableName.getName()))
           .setFamilyName(ByteString.copyFrom(regionName))
           .setEncodedRegionName(ByteString.copyFrom(regionName))
           .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
+          .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName()))
           .build());
         } else {
           edit = new WALEdit();
@@ -753,7 +755,8 @@ public class TestHRegion {
       long time = System.nanoTime();
 
       writer.append(new HLog.Entry(new HLogKey(regionName, tableName, 10, time,
-          HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(compactionDescriptor)));
+          HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
+          compactionDescriptor)));
       writer.close();
 
       // close the region now, and reopen again

http://git-wip-us.apache.org/repos/asf/hbase/blob/09fcdb50/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index 5c0f710..f67ab1f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.LargeTests;
 import org.apache.hadoop.hbase.TableName;
@@ -491,10 +492,12 @@ public class TestReplicationSmallTests extends TestReplicationBase {
   public void testCompactionWALEdits() throws Exception {
     WALProtos.CompactionDescriptor compactionDescriptor =
         WALProtos.CompactionDescriptor.getDefaultInstance();
-    WALEdit edit = WALEdit.createCompaction(compactionDescriptor);
+    HRegionInfo hri = new HRegionInfo(htable1.getName(),
+      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+    WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
     Replication.scopeWALEdits(htable1.getTableDescriptor(), new HLogKey(), edit);
   }
-  
+
   /**
    * Test for HBASE-8663
    * Create two new Tables with colfamilies enabled for replication then run


Mime
View raw message