hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From syuanji...@apache.org
Subject [33/37] hbase git commit: HBASE-15205 Do not find the replication scope for every WAL#append() (Ram)
Date Fri, 26 Feb 2016 23:09:54 GMT
HBASE-15205 Do not find the replication scope for every WAL#append() (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8f2bd060
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8f2bd060
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8f2bd060

Branch: refs/heads/hbase-12439
Commit: 8f2bd06019869a1738bcfd66066737cdb7802ca8
Parents: 538815d
Author: ramkrishna <ramkrishna.s.vasudevan@gmail.com>
Authored: Fri Feb 26 22:30:55 2016 +0530
Committer: ramkrishna <ramkrishna.s.vasudevan@gmail.com>
Committed: Fri Feb 26 22:30:55 2016 +0530

----------------------------------------------------------------------
 .../hbase/protobuf/ReplicationProtbufUtil.java  |   2 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  89 +++++++++-----
 .../hadoop/hbase/regionserver/HStore.java       |   2 +-
 .../hadoop/hbase/regionserver/wal/FSHLog.java   |  13 +-
 .../hbase/regionserver/wal/FSWALEntry.java      |  10 +-
 .../hadoop/hbase/regionserver/wal/HLogKey.java  |  48 ++++++--
 .../regionserver/wal/WALActionsListener.java    |   8 +-
 .../hadoop/hbase/regionserver/wal/WALUtil.java  |  57 +++++----
 .../hbase/replication/ScopeWALEntryFilter.java  |   2 +-
 .../replication/regionserver/Replication.java   |  70 +++--------
 .../hadoop/hbase/wal/DisabledWALProvider.java   |   4 +-
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |   9 +-
 .../org/apache/hadoop/hbase/wal/WALKey.java     | 121 +++++++++++++++----
 .../apache/hadoop/hbase/wal/WALSplitter.java    |   2 +-
 .../org/apache/hadoop/hbase/TestIOFencing.java  |   3 +-
 .../hbase/coprocessor/TestWALObserver.java      |  48 +++++---
 .../hbase/mapreduce/TestHLogRecordReader.java   |   7 +-
 .../hbase/mapreduce/TestImportExport.java       |  16 +--
 .../hbase/mapreduce/TestWALRecordReader.java    |  20 +--
 .../master/TestDistributedLogSplitting.java     |   9 +-
 .../hadoop/hbase/regionserver/TestBulkLoad.java |  17 +--
 .../hadoop/hbase/regionserver/TestHRegion.java  |  16 +--
 .../regionserver/TestHRegionReplayEvents.java   |   6 +-
 .../regionserver/TestHRegionServerBulkLoad.java |   3 +-
 .../hbase/regionserver/TestWALLockup.java       |  10 +-
 .../hbase/regionserver/wal/TestFSHLog.java      |  57 ++++++---
 .../regionserver/wal/TestLogRollAbort.java      |  12 +-
 .../wal/TestLogRollingNoCluster.java            |  11 +-
 .../wal/TestWALActionsListener.java             |  12 +-
 .../hbase/regionserver/wal/TestWALReplay.java   |  47 ++++---
 .../hbase/replication/TestReplicationBase.java  |   9 ++
 .../replication/TestReplicationSmallTests.java  |  13 +-
 .../TestReplicationWALEntryFilters.java         |  62 +++++-----
 .../TestReplicationSourceManager.java           |  57 +++++----
 .../TestReplicationWALReaderManager.java        |  13 +-
 .../apache/hadoop/hbase/wal/FaultyFSLog.java    |   7 +-
 .../hbase/wal/TestDefaultWALProvider.java       |  64 +++++++---
 .../wal/TestDefaultWALProviderWithHLogKey.java  |   7 +-
 .../apache/hadoop/hbase/wal/TestSecureWAL.java  |  11 +-
 .../apache/hadoop/hbase/wal/TestWALFactory.java |  74 ++++++++----
 .../hbase/wal/TestWALReaderOnSecureWAL.java     |  11 +-
 .../hbase/wal/WALPerformanceEvaluation.java     |  15 ++-
 42 files changed, 685 insertions(+), 389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 91185af..8cb2237 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -134,7 +134,7 @@ public class ReplicationProtbufUtil {
         keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
       }
       WALEdit edit = entry.getEdit();
-      NavigableMap<byte[], Integer> scopes = key.getScopes();
+      NavigableMap<byte[], Integer> scopes = key.getReplicationScopes();
       if (scopes != null && !scopes.isEmpty()) {
         for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
           scopeBuilder.setFamily(ByteStringer.wrap(scope.getKey()));

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b70a4c3..406850e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -17,19 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Closeables;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import com.google.protobuf.TextFormat;
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
 
 import java.io.EOFException;
 import java.io.FileNotFoundException;
@@ -195,6 +183,20 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.TextFormat;
+
 @SuppressWarnings("deprecation")
 @InterfaceAudience.Private
 public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
@@ -583,6 +585,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   private final MetricsRegionWrapperImpl metricsRegionWrapper;
   private final Durability durability;
   private final boolean regionStatsEnabled;
+  // Stores the replication scope of the various column families of the table
+  // that has non-default scope
+  private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<byte[], Integer>(
+      Bytes.BYTES_COMPARATOR);
 
   /**
    * HRegion constructor. This constructor should only be used for testing and
@@ -661,6 +667,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
     this.htableDescriptor = htd;
+    Set<byte[]> families = this.htableDescriptor.getFamiliesKeys();
+    for (byte[] family : families) {
+      if (!replicationScope.containsKey(family)) {
+        int scope = htd.getFamily(family).getScope();
+        // Only store those families that has NON-DEFAULT scope
+        if (scope != REPLICATION_SCOPE_LOCAL) {
+          // Do a copy before storing it here.
+          replicationScope.put(Bytes.copy(family), scope);
+        }
+      }
+    }
     this.rsServices = rsServices;
     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
     setHTableSpecificConf();
@@ -971,7 +988,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
       RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
       getRegionServerServices().getServerName(), storeFiles);
-    WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc, mvcc);
+    WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionOpenDesc,
+        mvcc);
   }
 
   private void writeRegionCloseMarker(WAL wal) throws IOException {
@@ -979,7 +997,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
       RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
       getRegionServerServices().getServerName(), storeFiles);
-    WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, mvcc);
+    WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc,
+        mvcc);
 
     // Store SeqId in HDFS when a region closes
     // checking region folder exists is due to many tests which delete the table folder while a
@@ -2285,7 +2304,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
             getRegionInfo(), flushOpSeqId, committedFiles);
         // No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH
-        WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc);
+        WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
+            mvcc);
       }
 
       // Prepare flush (take a snapshot)
@@ -2334,7 +2354,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     try {
       FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
           getRegionInfo(), flushOpSeqId, committedFiles);
-      WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false,
+      WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
           mvcc);
     } catch (Throwable t) {
       LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
@@ -2379,7 +2399,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
         getRegionInfo(), -1, new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR));
       try {
-        WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc);
+        WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
+            mvcc);
         return true;
       } catch (IOException e) {
         LOG.warn(getRegionInfo().getEncodedName() + " : "
@@ -2449,7 +2470,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
           getRegionInfo(), flushOpSeqId, committedFiles);
-        WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc);
+        WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
+            mvcc);
       }
     } catch (Throwable t) {
       // An exception here means that the snapshot was not persisted.
@@ -2462,7 +2484,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         try {
           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
             getRegionInfo(), flushOpSeqId, committedFiles);
-          WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc);
+          WALUtil.writeFlushMarker(wal, this.replicationScope, getRegionInfo(), desc, false, mvcc);
         } catch (Throwable ex) {
           LOG.warn(getRegionInfo().getEncodedName() + " : "
               + "failed writing ABORT_FLUSH marker to WAL", ex);
@@ -3139,13 +3161,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         if (!replay) {
           // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
           walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
-            this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
-            mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
+              this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
+              mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc,
+              this.getReplicationScope());
         }
         // TODO: Use the doAppend methods below... complicated by the replay stuff above.
         try {
-          long txid =
-            this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
+          long txid = this.wal.append(this.getRegionInfo(), walKey,
+              walEdit, true);
           if (txid != 0) sync(txid, durability);
           writeEntry = walKey.getWriteEntry();
         } catch (IOException ioe) {
@@ -3271,8 +3294,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     if (!replay) throw new IOException("Multiple nonces per batch and not in replay");
     WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
         this.htableDescriptor.getTableName(), now, mutation.getClusterIds(),
-        currentNonceGroup, currentNonce, mvcc);
-    this.wal.append(this.htableDescriptor,  this.getRegionInfo(), walKey, walEdit, true);
+        currentNonceGroup, currentNonce, mvcc, this.getReplicationScope());
+    this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
     // Complete the mvcc transaction started down in append else it will block others
     this.mvcc.complete(walKey.getWriteEntry());
   }
@@ -5389,7 +5412,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(
               this.getRegionInfo().getTable(),
               ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
-          WALUtil.writeBulkLoadMarkerAndSync(this.wal, getTableDesc(), getRegionInfo(),
+          WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
               loadDescriptor, mvcc);
         } catch (IOException ioe) {
           if (this.rsServices != null) {
@@ -6319,6 +6342,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return r.openHRegion(reporter);
   }
 
+  @VisibleForTesting
+  public NavigableMap<byte[], Integer> getReplicationScope() {
+    return this.replicationScope;
+  }
 
   /**
    * Useful when reopening a closed region (normally for unit tests)
@@ -7069,10 +7096,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // here instead of WALKey directly to support legacy coprocessors.
     WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
       this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds,
-      nonceGroup, nonce, mvcc);
+      nonceGroup, nonce, mvcc, this.getReplicationScope());
     try {
       long txid =
-        this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
+        this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
       // Call sync on our edit.
       if (txid != 0) sync(txid, durability);
       writeEntry = walKey.getWriteEntry();
@@ -7362,7 +7389,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      46 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+      47 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
       (14 * Bytes.SIZEOF_LONG) +
       5 * Bytes.SIZEOF_BOOLEAN);
 
@@ -7385,7 +7412,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
       MultiVersionConcurrencyControl.FIXED_SIZE // mvcc
-      + ClassSize.TREEMAP // maxSeqIdInStores
+      + 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes
       + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
       + ClassSize.STORE_SERVICES // store services
       ;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 22f99e9..7c71baf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1307,7 +1307,7 @@ public class HStore implements Store {
     // Fix reaching into Region to get the maxWaitForSeqId.
     // Does this method belong in Region altogether given it is making so many references up there?
     // Could be Region#writeCompactionMarker(compactionDescriptor);
-    WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getTableDesc(),
+    WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
         this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 09da8fc..f3f869c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -1083,8 +1082,8 @@ public class FSHLog implements WAL {
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
       justification="Will never be null")
   @Override
-  public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
-      final WALEdit edits, final boolean inMemstore) throws IOException {
+  public long append(final HRegionInfo hri,
+      final WALKey key, final WALEdit edits, final boolean inMemstore) throws IOException {
     if (this.closed) throw new IOException("Cannot append; log is closed");
     // Make a trace scope for the append.  It is closed on other side of the ring buffer by the
     // single consuming thread.  Don't have to worry about it.
@@ -1100,7 +1099,7 @@ public class FSHLog implements WAL {
       // Construction of FSWALEntry sets a latch.  The latch is thrown just after we stamp the
       // edit with its edit/sequence id.
       // TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
-      entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore);
+      entry = new FSWALEntry(sequence, key, edits, hri, inMemstore);
       truck.loadPayload(entry, scope.detach());
     } finally {
       this.disruptor.getRingBuffer().publish(sequence);
@@ -1878,14 +1877,12 @@ public class FSHLog implements WAL {
             entry.getEdit())) {
           if (entry.getEdit().isReplay()) {
             // Set replication scope null so that this won't be replicated
-            entry.getKey().setScopes(null);
+            entry.getKey().serializeReplicationScope(false);
           }
         }
         if (!listeners.isEmpty()) {
           for (WALActionsListener i: listeners) {
-            // TODO: Why does listener take a table description and CPs take a regioninfo?  Fix.
-            i.visitLogEntryBeforeWrite(entry.getHTableDescriptor(), entry.getKey(),
-              entry.getEdit());
+            i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit());
           }
         }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 86a3c3d..06318f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -26,7 +26,6 @@ import java.util.Set;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -51,15 +50,13 @@ class FSWALEntry extends Entry {
   // they are only in memory and held here while passing over the ring buffer.
   private final transient long sequence;
   private final transient boolean inMemstore;
-  private final transient HTableDescriptor htd;
   private final transient HRegionInfo hri;
   private final Set<byte[]> familyNames;
 
   FSWALEntry(final long sequence, final WALKey key, final WALEdit edit,
-      final HTableDescriptor htd, final HRegionInfo hri, final boolean inMemstore) {
+      final HRegionInfo hri, final boolean inMemstore) {
     super(key, edit);
     this.inMemstore = inMemstore;
-    this.htd = htd;
     this.hri = hri;
     this.sequence = sequence;
     if (inMemstore) {
@@ -71,6 +68,7 @@ class FSWALEntry extends Entry {
         Set<byte[]> familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
         for (Cell cell : cells) {
           if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+            // TODO: Avoid this clone?
             familySet.add(CellUtil.cloneFamily(cell));
           }
         }
@@ -89,10 +87,6 @@ class FSWALEntry extends Entry {
     return this.inMemstore;
   }
 
-  HTableDescriptor getHTableDescriptor() {
-    return this.htd;
-  }
-
   HRegionInfo getHRegionInfo() {
     return this.hri;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index 7c40323..d7bf4a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -24,6 +24,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NavigableMap;
 import java.util.UUID;
 
 import org.apache.commons.logging.Log;
@@ -67,7 +68,7 @@ public class HLogKey extends WALKey implements Writable {
   }
 
   public HLogKey(final byte[] encodedRegionName, final TableName tablename) {
-    super(encodedRegionName, tablename);
+    super(encodedRegionName, tablename, null);
   }
 
   @VisibleForTesting
@@ -75,11 +76,15 @@ public class HLogKey extends WALKey implements Writable {
     super(encodedRegionName, tablename, now);
   }
 
-  public HLogKey(final byte[] encodedRegionName,
-                 final TableName tablename,
-                 final long now,
-                 final MultiVersionConcurrencyControl mvcc) {
-    super(encodedRegionName, tablename, now, mvcc);
+  @VisibleForTesting
+  public HLogKey(final byte[] encodedRegionName, final TableName tablename, final long now,
+      final NavigableMap<byte[], Integer> replicationScope) {
+    super(encodedRegionName, tablename, now, replicationScope);
+  }
+
+  public HLogKey(final byte[] encodedRegionName, final TableName tablename, final long now,
+      final MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> scopes) {
+    super(encodedRegionName, tablename, now, mvcc, scopes);
   }
 
   /**
@@ -111,6 +116,35 @@ public class HLogKey extends WALKey implements Writable {
    * Create the log key for writing to somewhere.
    * We maintain the tablename mainly for debugging purposes.
    * A regionName is always a sub-table object.
+   * <p>Used by log splitting and snapshots.
+   *
+   * @param encodedRegionName Encoded name of the region as returned by
+   * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+   * @param tablename   - name of table
+   * @param logSeqNum   - log sequence number
+   * @param now Time at which this edit was written.
+   * @param clusterIds the clusters that have consumed the change(used in Replication)
+   * @param nonceGroup the noncegroup
+   * @param nonce      the nonce
+   * @param replicationScope the replicationScope of the non-default column families' of the region
+   */
+  public HLogKey(
+      final byte[] encodedRegionName,
+      final TableName tablename,
+      long logSeqNum,
+      final long now,
+      List<UUID> clusterIds,
+      long nonceGroup,
+      long nonce,
+      MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> replicationScope) {
+    super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc,
+        replicationScope);
+  }
+
+  /**
+   * Create the log key for writing to somewhere.
+   * We maintain the tablename mainly for debugging purposes.
+   * A regionName is always a sub-table object.
    *
    * @param encodedRegionName Encoded name of the region as returned by
    * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
@@ -192,7 +226,7 @@ public class HLogKey extends WALKey implements Writable {
     // encodes the length of encodedRegionName.
     // If < 0 we just read the version and the next vint is the length.
     // @see Bytes#readByteArray(DataInput)
-    setScopes(null); // writable HLogKey does not contain scopes
+    serializeReplicationScope(false); // writable HLogKey does not contain scopes
     int len = WritableUtils.readVInt(in);
     byte[] tablenameBytes = null;
     if (len < 0) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
index db98083..a6452e2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
 
 import org.apache.hadoop.hbase.wal.WALKey;
 
@@ -85,7 +84,6 @@ public interface WALActionsListener {
   );
 
   /**
-   * @param htd
    * @param logKey
    * @param logEdit TODO: Retire this in favor of
    *          {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)} It only exists to get
@@ -93,8 +91,7 @@ public interface WALActionsListener {
    *          <code>htd</code>.
    * @throws IOException If failed to parse the WALEdit
    */
-  void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
-      throws IOException;
+  void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException;
 
   /**
    * For notification post append to the writer.  Used by metrics system at least.
@@ -135,8 +132,7 @@ public interface WALActionsListener {
     public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey, WALEdit logEdit) {}
 
     @Override
-    public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
-        throws IOException {
+    public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index f268422..197144d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -20,11 +20,11 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.IOException;
+import java.util.NavigableMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -58,10 +58,11 @@ public class WALUtil {
    * <p>This write is for internal use only. Not for external client consumption.
    * @param mvcc Used by WAL to get sequence Id for the waledit.
    */
-  public static WALKey writeCompactionMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
-      final CompactionDescriptor c, MultiVersionConcurrencyControl mvcc)
+  public static WALKey writeCompactionMarker(WAL wal,
+      NavigableMap<byte[], Integer> replicationScope, HRegionInfo hri, final CompactionDescriptor c,
+      MultiVersionConcurrencyControl mvcc)
   throws IOException {
-    WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createCompaction(hri, c), mvcc);
+    WALKey walKey = writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
     }
@@ -73,11 +74,11 @@ public class WALUtil {
    *
    * <p>This write is for internal use only. Not for external client consumption.
    */
-  public static WALKey writeFlushMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
-      final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
-  throws IOException {
-    WALKey walKey =
-      doFullAppendTransaction(wal, htd, hri, WALEdit.createFlushWALEdit(hri, f), mvcc, sync);
+  public static WALKey writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
+      HRegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
+          throws IOException {
+    WALKey walKey = doFullAppendTransaction(wal, replicationScope, hri,
+        WALEdit.createFlushWALEdit(hri, f), mvcc, sync);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
     }
@@ -88,10 +89,12 @@ public class WALUtil {
    * Write a region open marker indicating that the region is opened.
    * This write is for internal use only. Not for external client consumption.
    */
-  public static WALKey writeRegionEventMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
+  public static WALKey writeRegionEventMarker(WAL wal,
+      NavigableMap<byte[], Integer> replicationScope, HRegionInfo hri,
       final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc)
   throws IOException {
-    WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc);
+    WALKey walKey = writeMarker(wal, replicationScope, hri,
+        WALEdit.createRegionEventWALEdit(hri, r), mvcc);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
     }
@@ -102,28 +105,30 @@ public class WALUtil {
    * Write a log marker that a bulk load has succeeded and is about to be committed.
    * This write is for internal use only. Not for external client consumption.
    * @param wal The log to write into.
-   * @param htd A description of the table that we are bulk loading into.
+   * @param replicationScope The replication scope of the families in the HRegion
    * @param hri A description of the region in the table that we are bulk loading into.
    * @param desc A protocol buffers based description of the client's bulk loading request
    * @return walKey with sequenceid filled out for this bulk load marker
    * @throws IOException We will throw an IOException if we can not append to the HLog.
    */
-  public static WALKey writeBulkLoadMarkerAndSync(final WAL wal, final HTableDescriptor htd,
-      final HRegionInfo hri, final WALProtos.BulkLoadDescriptor desc,
-      final MultiVersionConcurrencyControl mvcc)
-  throws IOException {
-    WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc);
+  public static WALKey writeBulkLoadMarkerAndSync(final WAL wal,
+      final NavigableMap<byte[], Integer> replicationScope, final HRegionInfo hri,
+      final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
+          throws IOException {
+    WALKey walKey = writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc),
+        mvcc);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
     }
     return walKey;
   }
 
-  private static WALKey writeMarker(final WAL wal, final HTableDescriptor htd,
-      final HRegionInfo hri, final WALEdit edit, final MultiVersionConcurrencyControl mvcc)
+  private static WALKey writeMarker(final WAL wal,
+      final NavigableMap<byte[], Integer> replicationScope, final HRegionInfo hri,
+      final WALEdit edit, final MultiVersionConcurrencyControl mvcc)
   throws IOException {
     // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
-    return doFullAppendTransaction(wal, htd, hri, edit, mvcc, true);
+    return doFullAppendTransaction(wal, replicationScope, hri, edit, mvcc, true);
   }
 
   /**
@@ -134,16 +139,16 @@ public class WALUtil {
    * <p>This write is for internal use only. Not for external client consumption.
    * @return WALKey that was added to the WAL.
    */
-  public static WALKey doFullAppendTransaction(final WAL wal, final HTableDescriptor htd,
-      final HRegionInfo hri, final WALEdit edit, final MultiVersionConcurrencyControl mvcc,
-      final boolean sync)
+  public static WALKey doFullAppendTransaction(final WAL wal,
+      final NavigableMap<byte[], Integer> replicationScope, final HRegionInfo hri,
+      final WALEdit edit, final MultiVersionConcurrencyControl mvcc, final boolean sync)
   throws IOException {
     // TODO: Pass in current time to use?
-    WALKey walKey =
-      new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), System.currentTimeMillis(), mvcc);
+    WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(),
+        System.currentTimeMillis(), mvcc, replicationScope);
     long trx = MultiVersionConcurrencyControl.NONE;
     try {
-      trx = wal.append(htd, hri, walKey, edit, false);
+      trx = wal.append(hri, walKey, edit, false);
       if (sync) {
         wal.sync(trx);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index f97ec15..28a83dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -44,7 +44,7 @@ public class ScopeWALEntryFilter implements WALEntryFilter {
 
   @Override
   public Entry filter(Entry entry) {
-    NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
+    NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
     if (scopes == null || scopes.isEmpty()) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index a5d2446..bb4a5a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -20,13 +20,10 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
-import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -43,7 +40,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
@@ -61,7 +57,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.zookeeper.KeeperException;
 
@@ -257,72 +252,47 @@ public class Replication extends WALActionsListener.Base implements
   }
 
   @Override
-  public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
-      throws IOException {
-    scopeWALEdits(htd, logKey, logEdit, this.conf, this.getReplicationManager());
+  public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
+    scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager());
   }
 
   /**
    * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
    * compaction WAL edits and if the scope is local.
-   * @param htd Descriptor used to find the scope to use
    * @param logKey Key that may get scoped according to its edits
    * @param logEdit Edits used to lookup the scopes
    * @param replicationManager Manager used to add bulk load events hfile references
    * @throws IOException If failed to parse the WALEdit
    */
-  public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, WALEdit logEdit,
-      Configuration conf, ReplicationSourceManager replicationManager) throws IOException {
-    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
-    byte[] family;
+  public static void scopeWALEdits(WALKey logKey,
+      WALEdit logEdit, Configuration conf, ReplicationSourceManager replicationManager)
+          throws IOException {
     boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf);
+    byte[] family;
+    boolean foundOtherEdits = false;
     for (Cell cell : logEdit.getCells()) {
       if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
         if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
-          scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell);
+          try {
+            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+            for (StoreDescriptor s : bld.getStoresList()) {
+              family = s.getFamilyName().toByteArray();
+              addHFileRefsToQueue(replicationManager, logKey.getTablename(), family, s);
+            }
+          } catch (IOException e) {
+            LOG.error("Failed to get bulk load events information from the wal file.", e);
+            throw e;
+          }
         } else {
           // Skip the flush/compaction/region events
           continue;
         }
       } else {
-        family = CellUtil.cloneFamily(cell);
-        // Unexpected, has a tendency to happen in unit tests
-        assert htd.getFamily(family) != null;
-
-        if (!scopes.containsKey(family)) {
-          int scope = htd.getFamily(family).getScope();
-          if (scope != REPLICATION_SCOPE_LOCAL) {
-            scopes.put(family, scope);
-          }
-        }
+        foundOtherEdits = true;
       }
     }
-    if (!scopes.isEmpty() && !logEdit.isReplay()) {
-      logKey.setScopes(scopes);
-    }
-  }
-
-  private static void scopeBulkLoadEdits(HTableDescriptor htd,
-      ReplicationSourceManager replicationManager, NavigableMap<byte[], Integer> scopes,
-      TableName tableName, Cell cell) throws IOException {
-    byte[] family;
-    try {
-      BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
-      for (StoreDescriptor s : bld.getStoresList()) {
-        family = s.getFamilyName().toByteArray();
-        if (!scopes.containsKey(family)) {
-          int scope = htd.getFamily(family).getScope();
-          if (scope != REPLICATION_SCOPE_LOCAL) {
-            scopes.put(family, scope);
-            addHFileRefsToQueue(replicationManager, tableName, family, s);
-          }
-        } else {
-          addHFileRefsToQueue(replicationManager, tableName, family, s);
-        }
-      }
-    } catch (IOException e) {
-      LOG.error("Failed to get bulk load events information from the wal file.", e);
-      throw e;
+    if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
+      logKey.serializeReplicationScope(false);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 0c41e77..c3d4b2c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.util.FSUtils;
 
 // imports for things that haven't moved from regionserver.wal yet.
@@ -154,8 +153,7 @@ class DisabledWALProvider implements WALProvider {
     }
 
     @Override
-    public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
-                       boolean inMemstore) {
+    public long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) {
       if (!this.listeners.isEmpty()) {
         final long start = System.nanoTime();
         long len = 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index d2b336e..0b83528 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -25,7 +25,6 @@ import java.util.Set;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 // imports we use from yet-to-be-moved regionsever.wal
@@ -106,21 +105,17 @@ public interface WAL {
    * completes BUT on return this edit must have its region edit/sequence id assigned
    * else it messes up our unification of mvcc and sequenceid.  On return <code>key</code> will
    * have the region edit/sequence id filled in.
-   * @param info
+   * @param info the regioninfo associated with append
    * @param key Modified by this call; we add to it this edits region edit/sequence id.
    * @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
    * sequence id that is after all currently appended edits.
-   * @param htd used to give scope for replication TODO refactor out in favor of table name and
-   * info
    * @param inMemstore Always true except for case where we are writing a compaction completion
    * record into the WAL; in this case the entry is just so we can finish an unfinished compaction
    * -- it is not an edit for memstore.
    * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
    * in it.
    */
-  long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
-    boolean inMemstore)
-  throws IOException;
+  long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException;
 
   /**
    * Sync what we have in the WAL.

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 09096fe..86fdfbd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -193,7 +193,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
   @InterfaceAudience.Private
   protected List<UUID> clusterIds;
 
-  private NavigableMap<byte[], Integer> scopes;
+  private NavigableMap<byte[], Integer> replicationScope;
 
   private long nonceGroup = HConstants.NO_NONCE;
   private long nonce = HConstants.NO_NONCE;
@@ -210,7 +210,12 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
 
   public WALKey() {
     init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
-        new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null);
+        new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
+  }
+
+  public WALKey(final NavigableMap<byte[], Integer> replicationScope) {
+    init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
+        new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope);
   }
 
   @VisibleForTesting
@@ -220,15 +225,16 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
     List<UUID> clusterIds = new ArrayList<UUID>();
     clusterIds.add(clusterId);
     init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
-        HConstants.NO_NONCE, HConstants.NO_NONCE, null);
+        HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
   }
 
   /**
    * @deprecated Remove. Useless.
    */
   @Deprecated // REMOVE
-  public WALKey(final byte[] encodedRegionName, final TableName tablename) {
-    this(encodedRegionName, tablename, System.currentTimeMillis());
+  public WALKey(final byte[] encodedRegionName, final TableName tablename,
+      final NavigableMap<byte[], Integer> replicationScope) {
+    this(encodedRegionName, tablename, System.currentTimeMillis(), replicationScope);
   }
 
   // TODO: Fix being able to pass in sequenceid.
@@ -240,7 +246,20 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
         EMPTY_UUIDS,
         HConstants.NO_NONCE,
         HConstants.NO_NONCE,
-        null);
+        null, null);
+  }
+
+  // TODO: Fix being able to pass in sequenceid.
+  public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now,
+      final NavigableMap<byte[], Integer> replicationScope) {
+    init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
+        HConstants.NO_NONCE, null, replicationScope);
+  }
+
+  public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now,
+      MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
+    init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
+        HConstants.NO_NONCE, mvcc, replicationScope);
   }
 
   public WALKey(final byte[] encodedRegionName,
@@ -254,7 +273,33 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
         EMPTY_UUIDS,
         HConstants.NO_NONCE,
         HConstants.NO_NONCE,
-        mvcc);
+        mvcc, null);
+  }
+
+  /**
+   * Create the log key for writing to somewhere.
+   * We maintain the tablename mainly for debugging purposes.
+   * A regionName is always a sub-table object.
+   * <p>Used by log splitting and snapshots.
+   *
+   * @param encodedRegionName Encoded name of the region as returned by
+   *                         <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+   * @param tablename         - name of table
+   * @param logSeqNum         - log sequence number
+   * @param now               Time at which this edit was written.
+   * @param clusterIds        the clusters that have consumed the change(used in Replication)
+   * @param nonceGroup        the nonceGroup
+   * @param nonce             the nonce
+   * @param mvcc              the mvcc associate the WALKey
+   * @param replicationScope  the non-default replication scope
+   *                          associated with the region's column families
+   */
+  // TODO: Fix being able to pass in sequenceid.
+  public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
+      final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
+      MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
+    init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc,
+        replicationScope);
   }
 
   /**
@@ -279,7 +324,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
                 long nonceGroup,
                 long nonce,
                 MultiVersionConcurrencyControl mvcc) {
-    init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
+    init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, null);
   }
 
   /**
@@ -289,7 +334,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
    *
    * @param encodedRegionName Encoded name of the region as returned by
    *                          <code>HRegionInfo#getEncodedNameAsBytes()</code>.
-   * @param tablename
+   * @param tablename         the tablename
    * @param now               Time at which this edit was written.
    * @param clusterIds        the clusters that have consumed the change(used in Replication)
    * @param nonceGroup
@@ -299,7 +344,31 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
   public WALKey(final byte[] encodedRegionName, final TableName tablename,
                 final long now, List<UUID> clusterIds, long nonceGroup,
                 final long nonce, final MultiVersionConcurrencyControl mvcc) {
-    init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc);
+    init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
+        null);
+  }
+
+  /**
+   * Create the log key for writing to somewhere.
+   * We maintain the tablename mainly for debugging purposes.
+   * A regionName is always a sub-table object.
+   *
+   * @param encodedRegionName Encoded name of the region as returned by
+   *                          <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+   * @param tablename
+   * @param now               Time at which this edit was written.
+   * @param clusterIds        the clusters that have consumed the change(used in Replication)
+   * @param nonceGroup        the nonceGroup
+   * @param nonce             the nonce
+   * @param mvcc mvcc control used to generate sequence numbers and control read/write points
+   * @param replicationScope  the non-default replication scope of the column families
+   */
+  public WALKey(final byte[] encodedRegionName, final TableName tablename,
+                final long now, List<UUID> clusterIds, long nonceGroup,
+                final long nonce, final MultiVersionConcurrencyControl mvcc,
+                NavigableMap<byte[], Integer> replicationScope) {
+    init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
+        replicationScope);
   }
 
   /**
@@ -328,7 +397,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
         EMPTY_UUIDS,
         nonceGroup,
         nonce,
-        mvcc);
+        mvcc, null);
   }
 
   @InterfaceAudience.Private
@@ -339,7 +408,8 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
                       List<UUID> clusterIds,
                       long nonceGroup,
                       long nonce,
-                      MultiVersionConcurrencyControl mvcc) {
+                      MultiVersionConcurrencyControl mvcc,
+                      NavigableMap<byte[], Integer> replicationScope) {
     this.sequenceId = logSeqNum;
     this.writeTime = now;
     this.clusterIds = clusterIds;
@@ -351,6 +421,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
     if (logSeqNum != NO_SEQUENCE_ID) {
       setSequenceId(logSeqNum);
     }
+    this.replicationScope = replicationScope;
   }
 
   // For HLogKey and deserialization. DO NOT USE. See setWriteEntry below.
@@ -418,8 +489,8 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
     return this.writeTime;
   }
 
-  public NavigableMap<byte[], Integer> getScopes() {
-    return scopes;
+  public NavigableMap<byte[], Integer> getReplicationScopes() {
+    return replicationScope;
   }
 
   /** @return The nonce group */
@@ -432,8 +503,14 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
     return nonce;
   }
 
-  public void setScopes(NavigableMap<byte[], Integer> scopes) {
-    this.scopes = scopes;
+  private void setReplicationScope(NavigableMap<byte[], Integer> replicationScope) {
+    this.replicationScope = replicationScope;
+  }
+
+  public void serializeReplicationScope(boolean serialize) {
+    if (!serialize) {
+      setReplicationScope(null);
+    }
   }
 
   public void readOlderScopes(NavigableMap<byte[], Integer> scopes) {
@@ -450,7 +527,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
         }
       }
       if (scopes.size() > 0) {
-        this.scopes = scopes;
+        this.replicationScope = scopes;
       }
     }
   }
@@ -598,8 +675,8 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
       uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
       builder.addClusterIds(uuidBuilder.build());
     }
-    if (scopes != null) {
-      for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
+    if (replicationScope != null) {
+      for (Map.Entry<byte[], Integer> e : replicationScope.entrySet()) {
         ByteString family = (compressionContext == null) ? ByteStringer.wrap(e.getKey())
             : compressor.compress(e.getKey(), compressionContext.familyDict);
         builder.addScopes(FamilyScope.newBuilder()
@@ -638,13 +715,13 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
     if (walKey.hasNonce()) {
       this.nonce = walKey.getNonce();
     }
-    this.scopes = null;
+    this.replicationScope = null;
     if (walKey.getScopesCount() > 0) {
-      this.scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+      this.replicationScope = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
       for (FamilyScope scope : walKey.getScopesList()) {
         byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
           uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
-        this.scopes.put(family, scope.getScopeType().getNumber());
+        this.replicationScope.put(family, scope.getScopeType().getNumber());
       }
     }
     setSequenceId(walKey.getLogSequenceNumber());

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 010fd37..ad5774f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -1778,7 +1778,7 @@ public class WALSplitter {
         WALEdit edit = entry.getEdit();
         TableName table = entry.getKey().getTablename();
         // clear scopes which isn't needed for recovery
-        entry.getKey().setScopes(null);
+        entry.getKey().serializeReplicationScope(false);
         String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
         // skip edits of non-existent tables
         if (nonExistentTables != null && nonExistentTables.contains(table)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 3aae5d5..d8363d4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -281,7 +281,8 @@ public class TestIOFencing {
       CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
         FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
         new Path("store_dir"));
-      WALUtil.writeCompactionMarker(compactingRegion.getWAL(), table.getTableDescriptor(),
+      WALUtil.writeCompactionMarker(compactingRegion.getWAL(),
+          ((HRegion)compactingRegion).getReplicationScope(),
         oldHri, compactionDescriptor, compactingRegion.getMVCC());
 
       // Wait till flush has happened, otherwise there won't be multiple store files

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
index 75fe7a2..03760d1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
@@ -29,6 +29,8 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -184,7 +186,11 @@ public class TestWALObserver {
     HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
     final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
         .toString(TEST_TABLE));
-
+    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+        Bytes.BYTES_COMPARATOR);
+    for(byte[] fam : htd.getFamiliesKeys()) {
+      scopes.put(fam, 0);
+    }
     Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
     deleteDir(basedir);
     fs.mkdirs(new Path(basedir, hri.getEncodedName()));
@@ -235,8 +241,8 @@ public class TestWALObserver {
     // it's where WAL write cp should occur.
     long now = EnvironmentEdgeManager.currentTime();
     // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
-    long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
-        edit, true);
+    long txid = log.append(hri,
+        new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, scopes), edit, true);
     log.sync(txid);
 
     // the edit shall have been change now by the coprocessor.
@@ -296,10 +302,15 @@ public class TestWALObserver {
     assertFalse(oldApi.isPostWALWriteDeprecatedCalled());
 
     LOG.debug("writing to WAL with non-legacy keys.");
+    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+        Bytes.BYTES_COMPARATOR);
+    for (HColumnDescriptor hcd : htd.getFamilies()) {
+      scopes.put(hcd.getName(), 0);
+    }
     final int countPerFamily = 5;
     for (HColumnDescriptor hcd : htd.getFamilies()) {
       addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
-          EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
+          EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc);
     }
 
     LOG.debug("Verify that only the non-legacy CP saw edits.");
@@ -323,7 +334,7 @@ public class TestWALObserver {
     final WALEdit edit = new WALEdit();
     final byte[] nonce = Bytes.toBytes("1772");
     edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce));
-    final long txid = wal.append(htd, hri, legacyKey, edit, true);
+    final long txid = wal.append(hri, legacyKey, edit, true);
     wal.sync(txid);
 
     LOG.debug("Make sure legacy cps can see supported edits after having been skipped.");
@@ -349,7 +360,11 @@ public class TestWALObserver {
     final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
     final HTableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
     final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
-
+    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+        Bytes.BYTES_COMPARATOR);
+    for(byte[] fam : htd.getFamiliesKeys()) {
+      scopes.put(fam, 0);
+    }
     WAL log = wals.getWAL(UNSPECIFIED_REGION, null);
     try {
       SampleRegionWALObserver cp = getCoprocessor(log, SampleRegionWALObserver.class);
@@ -360,8 +375,8 @@ public class TestWALObserver {
       assertFalse(cp.isPostWALWriteCalled());
 
       final long now = EnvironmentEdgeManager.currentTime();
-      long txid = log.append(htd, hri,
-          new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc),
+      long txid = log.append(hri,
+          new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes),
           new WALEdit(), true);
       log.sync(txid);
 
@@ -400,14 +415,18 @@ public class TestWALObserver {
     // Put p = creatPutWith2Families(TEST_ROW);
     WALEdit edit = new WALEdit();
     long now = EnvironmentEdgeManager.currentTime();
-    // addFamilyMapToWALEdit(p.getFamilyMap(), edit);
     final int countPerFamily = 1000;
-    // for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
+    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+        Bytes.BYTES_COMPARATOR);
+    for (HColumnDescriptor hcd : htd.getFamilies()) {
+      scopes.put(hcd.getName(), 0);
+    }
     for (HColumnDescriptor hcd : htd.getFamilies()) {
       addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
-          EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
+          EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc);
     }
-    wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
+    wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
+        true);
     // sync to fs.
     wal.sync();
 
@@ -527,7 +546,8 @@ public class TestWALObserver {
 
   private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
       final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
-      final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException {
+      final NavigableMap<byte[], Integer> scopes, final MultiVersionConcurrencyControl mvcc)
+          throws IOException {
     String familyStr = Bytes.toString(family);
     long txid = -1;
     for (int j = 0; j < count; j++) {
@@ -537,7 +557,7 @@ public class TestWALObserver {
       edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
       // uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care
       // about legacy coprocessors
-      txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
+      txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
           ee.currentTime(), mvcc), edit, true);
     }
     if (-1 != txid) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
index 5fa588b..752faa6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
@@ -18,6 +18,9 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
+
+import java.util.NavigableMap;
+
 import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogKeyRecordReader;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -32,8 +35,8 @@ import org.junit.experimental.categories.Category;
 public class TestHLogRecordReader extends TestWALRecordReader {
 
   @Override
-  protected WALKey getWalKey(final long time) {
-    return new HLogKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
+  protected WALKey getWalKey(final long time, NavigableMap<byte[], Integer> scopes) {
+    return new HLogKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
index 05f9f36..094fe1c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
@@ -34,6 +34,7 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.NavigableMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -656,9 +657,9 @@ public class TestImportExport {
       Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
 
       // Register the wal listener for the import table
-      TableWALActionListener walListener = new TableWALActionListener(importTableName);
       HRegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
           .getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
+      TableWALActionListener walListener = new TableWALActionListener(region);
       WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
       wal.registerWALActionsListener(walListener);
 
@@ -678,7 +679,7 @@ public class TestImportExport {
       region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
           .getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
       wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
-      walListener = new TableWALActionListener(importTableName);
+      walListener = new TableWALActionListener(region);
       wal.registerWALActionsListener(walListener);
       args = new String[] { importTableName, FQ_OUTPUT_DIR };
       assertTrue(runImport(args));
@@ -695,16 +696,17 @@ public class TestImportExport {
    */
   private static class TableWALActionListener extends WALActionsListener.Base {
 
-    private String tableName;
+    private HRegionInfo regionInfo;
     private boolean isVisited = false;
 
-    public TableWALActionListener(String tableName) {
-      this.tableName = tableName;
+    public TableWALActionListener(HRegionInfo region) {
+      this.regionInfo = region;
     }
 
     @Override
-    public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
-      if (tableName.equalsIgnoreCase(htd.getNameAsString()) && (!logEdit.isMetaEdit())) {
+    public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
+      if (logKey.getTablename().getNameAsString().equalsIgnoreCase(
+          this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit())) {
         isVisited = true;
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
index a4381c8..aee2a06 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
@@ -77,6 +79,8 @@ public class TestWALRecordReader {
   private static HTableDescriptor htd;
   private static Path logDir;
   protected MultiVersionConcurrencyControl mvcc;
+  protected static NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+      Bytes.BYTES_COMPARATOR);
 
   private static String getName() {
     return "TestWALRecordReader";
@@ -128,10 +132,10 @@ public class TestWALRecordReader {
     long ts = System.currentTimeMillis();
     WALEdit edit = new WALEdit();
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
-    log.append(htd, info, getWalKey(ts), edit, true);
+    log.append(info, getWalKey(ts, scopes), edit, true);
     edit = new WALEdit();
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
-    log.append(htd, info, getWalKey(ts+1), edit, true);
+    log.append(info, getWalKey(ts+1, scopes), edit, true);
     log.sync();
     LOG.info("Before 1st WAL roll " + log.toString());
     log.rollWriter();
@@ -142,10 +146,10 @@ public class TestWALRecordReader {
 
     edit = new WALEdit();
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
-    log.append(htd, info, getWalKey(ts1+1), edit, true);
+    log.append(info, getWalKey(ts1+1, scopes), edit, true);
     edit = new WALEdit();
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
-    log.append(htd, info, getWalKey(ts1+2), edit, true);
+    log.append(info, getWalKey(ts1+2, scopes), edit, true);
     log.sync();
     log.shutdown();
     walfactory.shutdown();
@@ -187,7 +191,7 @@ public class TestWALRecordReader {
     WALEdit edit = new WALEdit();
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
         System.currentTimeMillis(), value));
-    long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
+    long txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true);
     log.sync(txid);
 
     Thread.sleep(1); // make sure 2nd log gets a later timestamp
@@ -197,7 +201,7 @@ public class TestWALRecordReader {
     edit = new WALEdit();
     edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
         System.currentTimeMillis(), value));
-    txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
+    txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true);
     log.sync(txid);
     log.shutdown();
     walfactory.shutdown();
@@ -236,8 +240,8 @@ public class TestWALRecordReader {
     testSplit(splits.get(1));
   }
 
-  protected WALKey getWalKey(final long time) {
-    return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
+  protected WALKey getWalKey(final long time, NavigableMap<byte[], Integer> scopes) {
+    return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
   }
 
   protected WALRecordReader getReader() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index c5728cf..cff8db0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -1301,9 +1301,8 @@ public class TestDistributedLogSplitting {
         WALEdit e = new WALEdit();
         value++;
         e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
-        wal.append(htd, curRegionInfo,
-            new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis()),
-            e, true);
+        wal.append(curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName,
+            System.currentTimeMillis(), null), e, true);
       }
       wal.sync();
       wal.shutdown();
@@ -1397,7 +1396,7 @@ public class TestDistributedLogSplitting {
         WALEdit e = new WALEdit();
         value++;
         e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
-        wal.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(),
+        wal.append(curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(),
             tableName, System.currentTimeMillis()), e, true);
       }
       wal.sync();
@@ -1609,7 +1608,7 @@ public class TestDistributedLogSplitting {
         // key
         byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
         e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value));
-        log.append(htd, curRegionInfo,
+        log.append(curRegionInfo,
             new HLogKey(curRegionInfo.getEncodedNameAsBytes(), fullTName,
                 System.currentTimeMillis()), e, true);
         if (0 == i % syncEvery) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
index 3a7aff0..d0633a8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
@@ -87,6 +87,7 @@ public class TestBulkLoad {
   private final byte[] randomBytes = new byte[100];
   private final byte[] family1 = Bytes.toBytes("family1");
   private final byte[] family2 = Bytes.toBytes("family2");
+
   @Rule
   public TestName name = new TestName();
 
@@ -105,12 +106,12 @@ public class TestBulkLoad {
     storeFileName = (new Path(storeFileName)).getName();
     List<String> storeFileNames = new ArrayList<String>();
     storeFileNames.add(storeFileName);
-    when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class), any(WALKey.class),
+    when(log.append(any(HRegionInfo.class), any(WALKey.class),
             argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(),
                     familyName, storeFileNames)),
             any(boolean.class))).thenAnswer(new Answer() {
       public Object answer(InvocationOnMock invocation) {
-        WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
+        WALKey walKey = invocation.getArgumentAt(1, WALKey.class);
         MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
         if (mvcc != null) {
           MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
@@ -132,11 +133,11 @@ public class TestBulkLoad {
 
   @Test
   public void shouldBulkLoadSingleFamilyHLog() throws IOException {
-    when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
+    when(log.append(any(HRegionInfo.class),
             any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
             any(boolean.class))).thenAnswer(new Answer() {
       public Object answer(InvocationOnMock invocation) {
-        WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
+        WALKey walKey = invocation.getArgumentAt(1, WALKey.class);
         MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
         if (mvcc != null) {
           MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
@@ -151,11 +152,11 @@ public class TestBulkLoad {
 
   @Test
   public void shouldBulkLoadManyFamilyHLog() throws IOException {
-    when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
+    when(log.append(any(HRegionInfo.class),
             any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
             any(boolean.class))).thenAnswer(new Answer() {
               public Object answer(InvocationOnMock invocation) {
-                WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
+                WALKey walKey = invocation.getArgumentAt(1, WALKey.class);
                 MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
                 if (mvcc != null) {
                   MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
@@ -171,11 +172,11 @@ public class TestBulkLoad {
 
   @Test
   public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
-    when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
+    when(log.append(any(HRegionInfo.class),
             any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
             any(boolean.class))).thenAnswer(new Answer() {
       public Object answer(InvocationOnMock invocation) {
-        WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
+        WALKey walKey = invocation.getArgumentAt(1, WALKey.class);
         MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
         if (mvcc != null) {
           MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index a5574d3..ed7623c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -896,7 +896,7 @@ public class TestHRegion {
             storeFiles, Lists.newArrayList(newFile),
             region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
 
-      WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
+      WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
           this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
 
       Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
@@ -4796,7 +4796,7 @@ public class TestHRegion {
 
     //verify append called or not
     verify(wal, expectAppend ? times(1) : never())
-      .append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
+      .append((HRegionInfo)any(), (WALKey)any(),
           (WALEdit)any(), Mockito.anyBoolean());
 
     // verify sync called or not
@@ -5998,7 +5998,7 @@ public class TestHRegion {
       region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
         TEST_UTIL.getConfiguration(), rss, null);
 
-      verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
+      verify(wal, times(1)).append((HRegionInfo)any(), (WALKey)any()
         , editCaptor.capture(), anyBoolean());
 
       WALEdit edit = editCaptor.getValue();
@@ -6111,7 +6111,7 @@ public class TestHRegion {
 
       // verify that we have not appended region open event to WAL because this region is still
       // recovering
-      verify(wal, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
+      verify(wal, times(0)).append((HRegionInfo)any(), (WALKey)any()
         , editCaptor.capture(), anyBoolean());
 
       // not put the region out of recovering state
@@ -6119,7 +6119,7 @@ public class TestHRegion {
         .prepare().process();
 
       // now we should have put the entry
-      verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
+      verify(wal, times(1)).append((HRegionInfo)any(), (WALKey)any()
         , editCaptor.capture(), anyBoolean());
 
       WALEdit edit = editCaptor.getValue();
@@ -6163,12 +6163,12 @@ public class TestHRegion {
    */
   private WAL mockWAL() throws IOException {
     WAL wal = mock(WAL.class);
-    Mockito.when(wal.append((HTableDescriptor)Mockito.any(), (HRegionInfo)Mockito.any(),
+    Mockito.when(wal.append((HRegionInfo)Mockito.any(),
         (WALKey)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())).
       thenAnswer(new Answer<Long>() {
         @Override
         public Long answer(InvocationOnMock invocation) throws Throwable {
-          WALKey key = invocation.getArgumentAt(2, WALKey.class);
+          WALKey key = invocation.getArgumentAt(1, WALKey.class);
           MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
           key.setWriteEntry(we);
           return 1L;
@@ -6206,7 +6206,7 @@ public class TestHRegion {
     region.close(false);
 
     // 2 times, one for region open, the other close region
-    verify(wal, times(2)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
+    verify(wal, times(2)).append((HRegionInfo)any(), (WALKey)any(),
       editCaptor.capture(), anyBoolean());
 
     WALEdit edit = editCaptor.getAllValues().get(1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 4d5d7d8..9183e18 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -1126,7 +1126,7 @@ public class TestHRegionReplayEvents {
 
     // test for region open and close
     secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
-    verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
+    verify(walSecondary, times(0)).append((HRegionInfo)any(),
       (WALKey)any(), (WALEdit)any(),  anyBoolean());
 
     // test for replay prepare flush
@@ -1140,11 +1140,11 @@ public class TestHRegionReplayEvents {
       .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
       .build());
 
-    verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
+    verify(walSecondary, times(0)).append((HRegionInfo)any(),
       (WALKey)any(), (WALEdit)any(), anyBoolean());
 
     secondaryRegion.close();
-    verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
+    verify(walSecondary, times(0)).append((HRegionInfo)any(),
       (WALKey)any(), (WALEdit)any(),  anyBoolean());
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index 87cbab7..76b4134 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
@@ -411,7 +412,7 @@ public class TestHRegionServerBulkLoad {
     private boolean found = false;
 
     @Override
-    public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
+    public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
       for (Cell cell : logEdit.getCells()) {
         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
         for (Map.Entry entry : kv.toStringMap().entrySet()) {


Mime
View raw message