hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [2/2] hbase git commit: HBASE-9465 Push entries to peer clusters serially
Date Tue, 09 Aug 2016 07:33:41 GMT
HBASE-9465 Push entries to peer clusters serially

Signed-off-by: zhangduo <zhangduo@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/441bc050
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/441bc050
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/441bc050

Branch: refs/heads/branch-1
Commit: 441bc050b991c14c048617bc443b97f46e21b76f
Parents: 25c4ff5
Author: Phil Yang <ud1937@gmail.com>
Authored: Wed Aug 3 19:37:27 2016 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Tue Aug 9 15:26:15 2016 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/HTableDescriptor.java   |  46 ++-
 .../apache/hadoop/hbase/MetaTableAccessor.java  | 244 +++++++++--
 .../client/replication/ReplicationAdmin.java    |  14 +-
 .../org/apache/hadoop/hbase/HConstants.java     |  26 ++
 .../src/main/resources/hbase-default.xml        |  14 +
 .../hbase/protobuf/generated/WALProtos.java     |  16 +-
 hbase-protocol/src/main/protobuf/WAL.proto      |   1 +
 .../org/apache/hadoop/hbase/master/HMaster.java |   9 +
 .../hadoop/hbase/master/RegionStateStore.java   |  43 +-
 .../master/cleaner/ReplicationMetaCleaner.java  | 187 +++++++++
 .../RegionMergeTransactionImpl.java             |   3 +-
 .../hbase/regionserver/ReplicationService.java  |   1 +
 .../regionserver/SplitTransactionImpl.java      |   2 +-
 .../replication/regionserver/Replication.java   |  15 +-
 .../regionserver/ReplicationSource.java         | 131 +++++-
 .../regionserver/ReplicationSourceManager.java  |  87 +++-
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |  16 +
 .../hadoop/hbase/TestMetaTableAccessor.java     |   8 +-
 .../hadoop/hbase/client/TestMetaScanner.java    |   2 +-
 .../master/TestAssignmentManagerOnCluster.java  |   2 +-
 .../replication/TestSerialReplication.java      | 401 +++++++++++++++++++
 21 files changed, 1193 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index 31a880a..1750b24 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -34,13 +34,12 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.regex.Matcher;
 
-import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -52,6 +51,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.WritableComparable;
@@ -1216,6 +1216,18 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
   }
 
   /**
+   * Return true if there are at least one cf whose replication scope is serial.
+   */
+  public boolean hasSerialReplicationScope() {
+    for (HColumnDescriptor column: getFamilies()){
+      if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL){
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * Returns the configured replicas per region
    */
   public int getRegionReplication() {
@@ -1759,8 +1771,32 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
           .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
           // Disable blooms for meta.  Needs work.  Seems to mess w/ getClosestOrBefore.
           .setBloomFilterType(BloomType.NONE)
-          .setCacheDataInL1(true)
-         });
+            .setCacheDataInL1(true),
+          new HColumnDescriptor(HConstants.REPLICATION_BARRIER_FAMILY)
+              .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
+                  HConstants.DEFAULT_HBASE_META_VERSIONS))
+              .setInMemory(true)
+              .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
+                  HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
+              .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+              // Disable blooms for meta.  Needs work.  Seems to mess w/ getClosestOrBefore.
+              .setBloomFilterType(BloomType.NONE)
+              // Enable cache of data blocks in L1 if more than one caching tier deployed:
+              // e.g. if using CombinedBlockCache (BucketCache).
+              .setCacheDataInL1(true),
+          new HColumnDescriptor(HConstants.REPLICATION_POSITION_FAMILY)
+              .setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
+                  HConstants.DEFAULT_HBASE_META_VERSIONS))
+              .setInMemory(true)
+              .setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
+                  HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
+              .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+              // Disable blooms for meta.  Needs work.  Seems to mess w/ getClosestOrBefore.
+              .setBloomFilterType(BloomType.NONE)
+              // Enable cache of data blocks in L1 if more than one caching tier deployed:
+              // e.g. if using CombinedBlockCache (BucketCache).
+              .setCacheDataInL1(true),
+      });
     metaDescriptor.addCoprocessor(
       "org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint",
       null, Coprocessor.PRIORITY_SYSTEM, null);

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
index 6626485..46946e9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java
@@ -20,6 +20,20 @@ package org.apache.hadoop.hbase;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ServiceException;
 
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -49,18 +63,6 @@ import org.apache.hadoop.hbase.util.PairOfSameType;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 /**
  * Read/write operations on region and assignment information store in
  * <code>hbase:meta</code>.
@@ -107,10 +109,27 @@ public class MetaTableAccessor {
    *
    * The actual layout of meta should be encapsulated inside MetaTableAccessor methods,
    * and should not leak out of it (through Result objects, etc)
+   *
+   * For replication serially, there are two column families "rep_barrier", "rep_position" whose
+   * row key is encodedRegionName.
+   * rep_barrier:{seqid}      => in each time a RS opens a region, it saves the open sequence
+   *                                  id in this region
+   * rep_position:{peerid}    => to save the max sequence id we have pushed for each peer
+   * rep_position:_TABLENAME_ => a special cell to save this region's table name, will used when
+   *                             we clean old data
+   * rep_position:_DAUGHTER_  => a special cell to present this region is split or merged, in this
+   *                             cell the value is merged encoded name or two split encoded names
+   *                             separated by ","
    */
 
   private static final Log LOG = LogFactory.getLog(MetaTableAccessor.class);
 
+  // Save its daughter region(s) when split/merge
+  private static final byte[] daughterNamePosCq = Bytes.toBytes("_DAUGHTER_");
+  // Save its table name because we only know region's encoded name
+  private static final String tableNamePeer = "_TABLENAME_";
+  private static final byte[] tableNamePosCq = Bytes.toBytes(tableNamePeer);
+
   static final byte [] META_REGION_PREFIX;
   static {
     // Copy the prefix from FIRST_META_REGIONINFO into META_REGION_PREFIX.
@@ -962,6 +981,19 @@ public class MetaTableAccessor {
     return delete;
   }
 
+  public static Put makeBarrierPut(byte[] encodedRegionName, long seq, byte[] tableName) {
+    byte[] seqBytes = Bytes.toBytes(seq);
+    return new Put(encodedRegionName)
+        .addImmutable(HConstants.REPLICATION_BARRIER_FAMILY, seqBytes, seqBytes)
+        .addImmutable(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq, tableName);
+  }
+
+
+  public static Put makeSerialDaughterPut(byte[] encodedRegionName, byte[] value) {
+    return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_POSITION_FAMILY,
+        daughterNamePosCq, value);
+  }
+
   /**
    * Adds split daughters to the Put
    */
@@ -978,24 +1010,24 @@ public class MetaTableAccessor {
   }
 
   /**
-   * Put the passed <code>p</code> to the <code>hbase:meta</code> table.
+   * Put the passed <code>puts</code> to the <code>hbase:meta</code> table.
+   * Non-atomic for multi puts.
    * @param connection connection we're using
-   * @param p Put to add to hbase:meta
+   * @param puts Put to add to hbase:meta
    * @throws IOException
    */
-  static void putToMetaTable(final Connection connection, final Put p)
-    throws IOException {
-    put(getMetaHTable(connection), p);
+  static void putToMetaTable(final Connection connection, final Put... puts) throws IOException {
+    put(getMetaHTable(connection), Arrays.asList(puts));
   }
 
   /**
    * @param t Table to use (will be closed when done).
-   * @param p put to make
+   * @param puts puts to make
    * @throws IOException
    */
-  private static void put(final Table t, final Put p) throws IOException {
+  private static void put(final Table t, final List<Put> puts) throws IOException {
     try {
-      t.put(p);
+      t.put(puts);
     } finally {
       t.close();
     }
@@ -1121,7 +1153,7 @@ public class MetaTableAccessor {
    * Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
    * does not add its daughter's as different rows, but adds information about the daughters
    * in the same row as the parent. Use
-   * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)}
+   * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)}
    * if you want to do that.
    * @param meta the Table for META
    * @param regionInfo region information
@@ -1143,7 +1175,7 @@ public class MetaTableAccessor {
    * Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
    * does not add its daughter's as different rows, but adds information about the daughters
    * in the same row as the parent. Use
-   * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)}
+   * {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)}
    * if you want to do that.
    * @param connection connection we're using
    * @param regionInfo region information
@@ -1232,7 +1264,7 @@ public class MetaTableAccessor {
    */
   public static void mergeRegions(final Connection connection, HRegionInfo mergedRegion,
       HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication,
-      long masterSystemTime)
+      long masterSystemTime, boolean saveBarrier)
           throws IOException {
     Table meta = getMetaHTable(connection);
     try {
@@ -1263,7 +1295,17 @@ public class MetaTableAccessor {
 
       byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString()
         + HConstants.DELIMITER);
-      multiMutate(meta, tableRow, putOfMerged, deleteA, deleteB);
+      Mutation[] mutations;
+      if (saveBarrier) {
+        Put putBarrierA = makeSerialDaughterPut(regionA.getEncodedNameAsBytes(),
+            Bytes.toBytes(mergedRegion.getEncodedName()));
+        Put putBarrierB = makeSerialDaughterPut(regionB.getEncodedNameAsBytes(),
+            Bytes.toBytes(mergedRegion.getEncodedName()));
+        mutations = new Mutation[] { putOfMerged, deleteA, deleteB, putBarrierA, putBarrierB };
+      } else {
+        mutations = new Mutation[] { putOfMerged, deleteA, deleteB };
+      }
+      multiMutate(meta, tableRow, mutations);
     } finally {
       meta.close();
     }
@@ -1279,10 +1321,11 @@ public class MetaTableAccessor {
    * @param splitA Split daughter region A
    * @param splitB Split daughter region A
    * @param sn the location of the region
+   * @param saveBarrier true if need save replication barrier in meta, used for serial replication
    */
-  public static void splitRegion(final Connection connection,
-                                 HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
-                                 ServerName sn, int regionReplication) throws IOException {
+  public static void splitRegion(final Connection connection, HRegionInfo parent,
+      HRegionInfo splitA, HRegionInfo splitB, ServerName sn, int regionReplication,
+      boolean saveBarrier) throws IOException {
     Table meta = getMetaHTable(connection);
     try {
       HRegionInfo copyOfParent = new HRegionInfo(parent);
@@ -1307,8 +1350,17 @@ public class MetaTableAccessor {
         addEmptyLocation(putB, i);
       }
 
+      Mutation[] mutations;
+      if (saveBarrier) {
+        Put putBarrier = makeSerialDaughterPut(parent.getEncodedNameAsBytes(),
+            Bytes
+                .toBytes(splitA.getEncodedName() + HConstants.DELIMITER + splitB.getEncodedName()));
+        mutations = new Mutation[]{putParent, putA, putB, putBarrier};
+      } else {
+        mutations = new Mutation[]{putParent, putA, putB};
+      }
       byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER);
-      multiMutate(meta, tableRow, putParent, putA, putB);
+      multiMutate(meta, tableRow, mutations);
     } finally {
       meta.close();
     }
@@ -1366,6 +1418,27 @@ public class MetaTableAccessor {
   }
 
   /**
+   * Updates the progress of pushing entries to peer cluster. Skip entry if value is -1.
+   * @param connection connection we're using
+   * @param peerId the peerId to push
+   * @param positions map that saving positions for each region
+   * @throws IOException
+   */
+  public static void updateReplicationPositions(Connection connection, String peerId,
+      Map<String, Long> positions) throws IOException {
+    List<Put> puts = new ArrayList<>();
+    for (Map.Entry<String, Long> entry : positions.entrySet()) {
+      long value = Math.abs(entry.getValue());
+      Put put = new Put(Bytes.toBytes(entry.getKey()));
+      put.addImmutable(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId),
+          Bytes.toBytes(value));
+      puts.add(put);
+    }
+    getMetaHTable(connection).put(puts);
+  }
+
+
+  /**
    * Updates the location of the specified region to be the specified server.
    * <p>
    * Connects to the specified server which should be hosting the specified
@@ -1534,4 +1607,119 @@ public class MetaTableAccessor {
     p.addImmutable(HConstants.CATALOG_FAMILY, getSeqNumColumn(replicaId), now, null);
     return p;
   }
+
+  /**
+   * Get replication position for a peer in a region.
+   * @param connection connection we're using
+   * @return the position of this peer, -1 if no position in meta.
+   */
+  public static long getReplicationPositionForOnePeer(Connection connection,
+      byte[] encodedRegionName, String peerId) throws IOException {
+    Get get = new Get(encodedRegionName);
+    get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId));
+    Result r = get(getMetaHTable(connection), get);
+    if (r.isEmpty()) {
+      return -1;
+    }
+    Cell cell = r.rawCells()[0];
+    return Bytes.toLong(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
+  }
+
+  /**
+   * Get replication positions for all peers in a region.
+   * @param connection connection we're using
+   * @param encodedRegionName region's encoded name
+   * @return the map of positions for each peer
+   */
+  public static Map<String, Long> getReplicationPositionForAllPeer(Connection connection,
+      byte[] encodedRegionName) throws IOException {
+    Get get = new Get(encodedRegionName);
+    get.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
+    Result r = get(getMetaHTable(connection), get);
+    Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1));
+    for (Cell c : r.listCells()) {
+      if (!Bytes.equals(tableNamePosCq, 0, tableNamePosCq.length, c.getQualifierArray(),
+          c.getQualifierOffset(), c.getQualifierLength()) &&
+          !Bytes.equals(daughterNamePosCq, 0, daughterNamePosCq.length, c.getQualifierArray(),
+              c.getQualifierOffset(), c.getQualifierLength())) {
+        map.put(
+            Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()),
+            Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
+      }
+    }
+    return map;
+  }
+
+  /**
+   * Get all barriers in all regions.
+   * @return a map of barrier lists in all regions
+   * @throws IOException
+   */
+  public static List<Long> getReplicationBarriers(Connection connection, byte[] encodedRegionName)
+      throws IOException {
+    Get get = new Get(encodedRegionName);
+    get.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
+    Result r = get(getMetaHTable(connection), get);
+    List<Long> list = new ArrayList<>();
+    if (!r.isEmpty()) {
+      for (Cell cell : r.rawCells()) {
+        list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
+            cell.getQualifierLength()));
+      }
+    }
+    return list;
+  }
+
+  public static Map<String, List<Long>> getAllBarriers(Connection connection) throws IOException {
+    Map<String, List<Long>> map = new HashMap<>();
+    Scan scan = new Scan();
+    scan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
+    try (Table t = getMetaHTable(connection);
+        ResultScanner scanner = t.getScanner(scan)) {
+      Result result;
+      while ((result = scanner.next()) != null) {
+        String key = Bytes.toString(result.getRow());
+        List<Long> list = new ArrayList<>();
+        for (Cell cell : result.rawCells()) {
+          list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
+              cell.getQualifierLength()));
+        }
+        map.put(key, list);
+      }
+    }
+    return map;
+  }
+
+  /**
+   * Get daughter region(s) for a region, only used in serial replication.
+   * @throws IOException
+   */
+  public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName)
+      throws IOException {
+    Get get = new Get(encodedName);
+    get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, daughterNamePosCq);
+    Result result = get(getMetaHTable(connection), get);
+    if (!result.isEmpty()) {
+      Cell c = result.rawCells()[0];
+      return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+    }
+    return null;
+  }
+
+  /**
+   * Get the table name for a region, only used in serial replication.
+   * @throws IOException
+   */
+  public static String getSerialReplicationTableName(Connection connection, byte[] encodedName)
+      throws IOException {
+    Get get = new Get(encodedName);
+    get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq);
+    Result result = get(getMetaHTable(connection), get);
+    if (!result.isEmpty()) {
+      Cell c = result.rawCells()[0];
+      return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+    }
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 1305002..1304396 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -92,8 +92,10 @@ public class ReplicationAdmin implements Closeable {
   // only Global for now, can add other type
   // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
   public static final String REPLICATIONTYPE = "replicationType";
-  public static final String REPLICATIONGLOBAL = Integer
-      .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
+  public static final String REPLICATIONGLOBAL =
+      Integer.toString(HConstants.REPLICATION_SCOPE_GLOBAL);
+  public static final String REPLICATIONSERIAL =
+      Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL);
 
   private final Connection connection;
   // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
@@ -517,7 +519,10 @@ public class ReplicationAdmin implements Closeable {
           HashMap<String, String> replicationEntry = new HashMap<String, String>();
           replicationEntry.put(TNAME, tableName);
           replicationEntry.put(CFNAME, column.getNameAsString());
-          replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
+          replicationEntry.put(REPLICATIONTYPE,
+              column.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL ?
+                  REPLICATIONGLOBAL :
+                  REPLICATIONSERIAL);
           replicationColFams.add(replicationEntry);
         }
       }
@@ -712,7 +717,8 @@ public class ReplicationAdmin implements Closeable {
    */
   private boolean isTableRepEnabled(HTableDescriptor htd) {
     for (HColumnDescriptor hcd : htd.getFamilies()) {
-      if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
+      if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
+          && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 654c012..61b54ee 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -426,6 +426,20 @@ public final class HConstants {
   /** The catalog family */
   public static final byte [] CATALOG_FAMILY = Bytes.toBytes(CATALOG_FAMILY_STR);
 
+  /** The replication barrier family as a string*/
+  public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier";
+
+  /** The replication barrier family */
+  public static final byte [] REPLICATION_BARRIER_FAMILY =
+      Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
+
+  /** The replication barrier family as a string*/
+  public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position";
+
+  /** The replication barrier family */
+  public static final byte [] REPLICATION_POSITION_FAMILY =
+      Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR);
+
   /** The RegionInfo qualifier as a string */
   public static final String REGIONINFO_QUALIFIER_STR = "regioninfo";
 
@@ -621,6 +635,12 @@ public final class HConstants {
   public static final int REPLICATION_SCOPE_GLOBAL = 1;
 
   /**
+   * Scope tag for serially scoped data
+   * This data will be replicated to all peers by the order of sequence id.
+   */
+  public static final int REPLICATION_SCOPE_SERIAL = 2;
+
+  /**
    * Default cluster ID, cannot be used to identify a cluster so a key with
    * this value means it wasn't meant for replication.
    */
@@ -854,6 +874,12 @@ public final class HConstants {
   public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
   /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
   public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id";
+
+  public static final String
+      REPLICATION_SERIALLY_WAITING_KEY = "hbase.serial.replication.waitingMs";
+  public static final long
+      REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
+
   /**
    * Directory where the source cluster file system client configuration are placed which is used by
    * sink cluster to copy HFiles from source cluster file system

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 97fa0b2..0659dec 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1466,6 +1466,20 @@ possible configurations would overwhelm and obscure the important.
         slave clusters. The default of 10 will rarely need to be changed.
     </description>
   </property>
+  <property>
+    <name>hbase.serial.replication.waitingMs</name>
+    <value>10000</value>
+    <description>
+      By default, in replication we can not make sure the order of operations in slave cluster is
+      same as the order in master. If set REPLICATION_SCOPE to 2, we will push edits by the order
+      of written. This configure is to set how long (in ms) we will wait before next checking if a
+      log can not push right now because there are some logs written before it have not been pushed.
+      A larger waiting will decrease the number of queries on hbase:meta but will enlarge the delay
+      of replication. This feature relies on zk-less assignment, and conflicts with distributed log
+      replay. So users must set hbase.assignment.usezk and hbase.master.distributed.log.replay to
+      false to support it.
+    </description>
+  </property>
   <!-- Static Web User Filter properties. -->
   <property>
     <description>

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index 4da6fa2..9bae06f 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -21,6 +21,10 @@ public final class WALProtos {
      * <code>REPLICATION_SCOPE_GLOBAL = 1;</code>
      */
     REPLICATION_SCOPE_GLOBAL(1, 1),
+    /**
+     * <code>REPLICATION_SCOPE_SERIAL = 2;</code>
+     */
+    REPLICATION_SCOPE_SERIAL(2, 2),
     ;
 
     /**
@@ -31,6 +35,10 @@ public final class WALProtos {
      * <code>REPLICATION_SCOPE_GLOBAL = 1;</code>
      */
     public static final int REPLICATION_SCOPE_GLOBAL_VALUE = 1;
+    /**
+     * <code>REPLICATION_SCOPE_SERIAL = 2;</code>
+     */
+    public static final int REPLICATION_SCOPE_SERIAL_VALUE = 2;
 
 
     public final int getNumber() { return value; }
@@ -39,6 +47,7 @@ public final class WALProtos {
       switch (value) {
         case 0: return REPLICATION_SCOPE_LOCAL;
         case 1: return REPLICATION_SCOPE_GLOBAL;
+        case 2: return REPLICATION_SCOPE_SERIAL;
         default: return null;
       }
     }
@@ -12014,10 +12023,11 @@ public final class WALProtos {
       "criptor\022$\n\006server\030\006 \001(\0132\024.hbase.pb.Serve" +
       "rName\022\023\n\013region_name\030\007 \001(\014\".\n\tEventType\022" +
       "\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWA" +
-      "LTrailer*F\n\tScopeType\022\033\n\027REPLICATION_SCO" +
+      "LTrailer*d\n\tScopeType\022\033\n\027REPLICATION_SCO" +
       "PE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001" +
-      "B?\n*org.apache.hadoop.hbase.protobuf.gen" +
-      "eratedB\tWALProtosH\001\210\001\000\240\001\001"
+      "\022\034\n\030REPLICATION_SCOPE_SERIAL\020\002B?\n*org.ap" +
+      "ache.hadoop.hbase.protobuf.generatedB\tWA" +
+      "LProtosH\001\210\001\000\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-protocol/src/main/protobuf/WAL.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto
index 2061b22..5d91c4c 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -76,6 +76,7 @@ message WALKey {
 enum ScopeType {
   REPLICATION_SCOPE_LOCAL = 0;
   REPLICATION_SCOPE_GLOBAL = 1;
+  REPLICATION_SCOPE_SERIAL = 2;
 }
 
 message FamilyScope {

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 7dc5df2..70c8ecb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -97,6 +97,7 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
 import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
 import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
@@ -315,6 +316,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
 
   CatalogJanitor catalogJanitorChore;
   private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
+  private ReplicationMetaCleaner replicationMetaCleaner;
   private LogCleaner logCleaner;
   private HFileCleaner hfileCleaner;
 
@@ -1160,6 +1162,12 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
         LOG.error("start replicationZKLockCleanerChore failed", e);
       }
     }
+    try {
+      replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
+      getChoreService().scheduleChore(replicationMetaCleaner);
+    } catch (Exception e) {
+      LOG.error("start ReplicationMetaCleaner failed", e);
+    }
   }
 
   @Override
@@ -1194,6 +1202,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
     if (this.logCleaner != null) this.logCleaner.cancel(true);
     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
     if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
+    if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
     if (this.quotaManager != null) this.quotaManager.stop();
     if (this.activeMasterManager != null) this.activeMasterManager.stop();
     if (this.serverManager != null) this.serverManager.stop();

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
index 476b4d5..2d445e2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
@@ -17,11 +17,15 @@
  */
 package org.apache.hadoop.hbase.master;
 
+import com.google.common.base.Preconditions;
+
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -45,8 +49,6 @@ import org.apache.hadoop.hbase.util.MultiHConnection;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.zookeeper.KeeperException;
 
-import com.google.common.base.Preconditions;
-
 /**
  * A helper to persist region state in meta. We may change this class
  * to StateStore later if we also use it to store other states in meta
@@ -63,7 +65,7 @@ public class RegionStateStore {
   private volatile boolean initialized;
 
   private final boolean noPersistence;
-  private final Server server;
+  private final MasterServices server;
 
   /**
    * Returns the {@link ServerName} from catalog table {@link Result}
@@ -133,7 +135,7 @@ public class RegionStateStore {
           State.SPLITTING_NEW, State.MERGED));
   }
 
-  RegionStateStore(final Server server) {
+  RegionStateStore(final MasterServices server) {
     Configuration conf = server.getConfiguration();
     // No need to persist if using ZK but not migrating
     noPersistence = ConfigUtil.useZKForAssignment(conf)
@@ -198,31 +200,41 @@ public class RegionStateStore {
     State state = newState.getState();
 
       int replicaId = hri.getReplicaId();
-      Put put = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
+      Put metaPut = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
       StringBuilder info = new StringBuilder("Updating hbase:meta row ");
       info.append(hri.getRegionNameAsString()).append(" with state=").append(state);
       if (serverName != null && !serverName.equals(oldServer)) {
-        put.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
+        metaPut.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
           Bytes.toBytes(serverName.getServerName()));
         info.append(", sn=").append(serverName);
       }
       if (openSeqNum >= 0) {
         Preconditions.checkArgument(state == State.OPEN
           && serverName != null, "Open region should be on a server");
-        MetaTableAccessor.addLocation(put, serverName, openSeqNum, -1, replicaId);
+        MetaTableAccessor.addLocation(metaPut, serverName, openSeqNum, -1, replicaId);
         info.append(", openSeqNum=").append(openSeqNum);
         info.append(", server=").append(serverName);
       }
-      put.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
+      metaPut.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
         Bytes.toBytes(state.name()));
       LOG.info(info);
-
+      HTableDescriptor descriptor = server.getTableDescriptors().get(hri.getTable());
+      boolean serial = false;
+      if (descriptor != null) {
+        serial = server.getTableDescriptors().get(hri.getTable()).hasSerialReplicationScope();
+      }
+      boolean shouldPutBarrier = serial && state == State.OPEN;
       // Persist the state change to meta
       if (metaRegion != null) {
         try {
           // Assume meta is pinned to master.
           // At least, that's what we want.
-          metaRegion.put(put);
+          metaRegion.put(metaPut);
+          if (shouldPutBarrier) {
+            Put barrierPut = MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
+                openSeqNum, hri.getTable().getName());
+            metaRegion.put(barrierPut);
+          }
           return; // Done here
         } catch (Throwable t) {
           // In unit tests, meta could be moved away by intention
@@ -241,7 +253,10 @@ public class RegionStateStore {
         }
       }
       // Called when meta is not on master
-      multiHConnection.processBatchCallback(Arrays.asList(put), TableName.META_TABLE_NAME, null, null);
+      List<Put> list = shouldPutBarrier ?
+          Arrays.asList(metaPut, MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
+              openSeqNum, hri.getTable().getName())) : Arrays.asList(metaPut);
+      multiHConnection.processBatchCallback(list, TableName.META_TABLE_NAME, null, null);
 
     } catch (IOException ioe) {
       LOG.error("Failed to persist region state " + newState, ioe);
@@ -251,12 +266,14 @@ public class RegionStateStore {
 
   void splitRegion(HRegionInfo p,
       HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
-    MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication);
+    MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication,
+        server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
   }
 
   void mergeRegions(HRegionInfo p,
       HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
     MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication,
-    		EnvironmentEdgeManager.currentTime());
+        EnvironmentEdgeManager.currentTime(),
+        server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
new file mode 100644
index 0000000..41864b9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This chore is to clean up the useless data in hbase:meta which is used by serial replication.
+ */
+@InterfaceAudience.Private
+public class ReplicationMetaCleaner extends ScheduledChore {
+
+  private static final Log LOG = LogFactory.getLog(ReplicationMetaCleaner.class);
+
+  private ReplicationAdmin replicationAdmin;
+  private MasterServices master;
+
+  public ReplicationMetaCleaner(MasterServices master, Stoppable stoppable, int period)
+      throws IOException {
+    super("ReplicationMetaCleaner", stoppable, period);
+    this.master = master;
+    replicationAdmin = new ReplicationAdmin(master.getConfiguration());
+  }
+
+  @Override
+  protected void chore() {
+    try {
+      Map<String, HTableDescriptor> tables = master.getTableDescriptors().getAll();
+      Map<String, Set<String>> serialTables = new HashMap<>();
+      for (Map.Entry<String, HTableDescriptor> entry : tables.entrySet()) {
+        boolean hasSerialScope = false;
+        for (HColumnDescriptor column : entry.getValue().getFamilies()) {
+          if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL) {
+            hasSerialScope = true;
+            break;
+          }
+        }
+        if (hasSerialScope) {
+          serialTables.put(entry.getValue().getTableName().getNameAsString(), new HashSet<String>());
+        }
+      }
+      if (serialTables.isEmpty()){
+        return;
+      }
+
+      Map<String, ReplicationPeerConfig> peers = replicationAdmin.listPeerConfigs();
+      for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
+        for (Map.Entry<byte[], byte[]> map : entry.getValue().getPeerData()
+            .entrySet()) {
+          String tableName = Bytes.toString(map.getKey());
+          if (serialTables.containsKey(tableName)) {
+            serialTables.get(tableName).add(entry.getKey());
+            break;
+          }
+        }
+      }
+
+      Map<String, List<Long>> barrierMap = MetaTableAccessor.getAllBarriers(master.getConnection());
+      for (Map.Entry<String, List<Long>> entry : barrierMap.entrySet()) {
+        String encodedName = entry.getKey();
+        byte[] encodedBytes = Bytes.toBytes(encodedName);
+        boolean canClearRegion = false;
+        Map<String, Long> posMap = MetaTableAccessor.getReplicationPositionForAllPeer(
+            master.getConnection(), encodedBytes);
+        if (posMap.isEmpty()) {
+          continue;
+        }
+
+        String tableName = MetaTableAccessor.getSerialReplicationTableName(
+            master.getConnection(), encodedBytes);
+        Set<String> confPeers = serialTables.get(tableName);
+        if (confPeers == null) {
+          // This table doesn't exist or all cf's scope is not serial any more, we can clear meta.
+          canClearRegion = true;
+        } else {
+          if (!allPeersHavePosition(confPeers, posMap)) {
+            continue;
+          }
+
+          String daughterValue = MetaTableAccessor
+              .getSerialReplicationDaughterRegion(master.getConnection(), encodedBytes);
+          if (daughterValue != null) {
+            //this region is merged or split
+            boolean allDaughterStart = true;
+            String[] daughterRegions = daughterValue.split(",");
+            for (String daughter : daughterRegions) {
+              byte[] region = Bytes.toBytes(daughter);
+              if (!MetaTableAccessor.getReplicationBarriers(
+                  master.getConnection(), region).isEmpty() &&
+                  !allPeersHavePosition(confPeers,
+                      MetaTableAccessor
+                          .getReplicationPositionForAllPeer(master.getConnection(), region))) {
+                allDaughterStart = false;
+                break;
+              }
+            }
+            if (allDaughterStart) {
+              canClearRegion = true;
+            }
+          }
+        }
+        if (canClearRegion) {
+          Delete delete = new Delete(encodedBytes);
+          delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
+          delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
+          try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
+            metaTable.delete(delete);
+          }
+        } else {
+
+          // Barriers whose seq is larger than min pos of all peers, and the last barrier whose seq
+          // is smaller than min pos should be kept. All other barriers can be deleted.
+
+          long minPos = Long.MAX_VALUE;
+          for (Map.Entry<String, Long> pos : posMap.entrySet()) {
+            minPos = Math.min(minPos, pos.getValue());
+          }
+          List<Long> barriers = entry.getValue();
+          int index = Collections.binarySearch(barriers, minPos);
+          if (index < 0) {
+            index = -index - 1;
+          }
+          Delete delete = new Delete(encodedBytes);
+          for (int i = 0; i < index - 1; i++) {
+            delete.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, Bytes.toBytes(barriers.get(i)));
+          }
+          try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
+            metaTable.delete(delete);
+          }
+        }
+
+      }
+
+    } catch (IOException e) {
+      LOG.error("Exception during cleaning up.", e);
+    }
+
+  }
+
+  private boolean allPeersHavePosition(Set<String> peers, Map<String, Long> posMap)
+      throws IOException {
+    for(String peer:peers){
+      if (!posMap.containsKey(peer)){
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
index 03aa059..9e31fb0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java
@@ -434,7 +434,8 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
       if (metaEntries.isEmpty()) {
         MetaTableAccessor.mergeRegions(server.getConnection(),
           mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(),
-          server.getServerName(), region_a.getTableDesc().getRegionReplication(), masterSystemTime);
+          server.getServerName(), region_a.getTableDesc().getRegionReplication(), masterSystemTime,
+            false);
       } else {
         mergeRegionsAndPutMetaEntries(server.getConnection(),
           mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index 25a27a9..95da92a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
index 958e1f3..344ea97 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java
@@ -337,7 +337,7 @@ public class SplitTransactionImpl implements SplitTransaction {
         MetaTableAccessor.splitRegion(server.getConnection(),
           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
           daughterRegions.getSecond().getRegionInfo(), server.getServerName(),
-          parent.getTableDesc().getRegionReplication());
+          parent.getTableDesc().getRegionReplication(), false);
       } else {
         offlineParentInMetaAndputMetaEntries(server.getConnection(),
           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 06138a5..d9a20ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Server;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
@@ -311,8 +313,19 @@ public class Replication extends WALActionsListener.Base implements
         if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
           scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell);
         } else {
-          // Skip the flush/compaction/region events
+          WALProtos.RegionEventDescriptor maybeEvent = WALEdit.getRegionEventDescriptor(cell);
+          if (maybeEvent != null && (maybeEvent.getEventType() ==
+              WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
+            // In serially replication, we use scopes when reading close marker.
+            for (HColumnDescriptor cf :htd.getFamilies()) {
+              if (cf.getScope() != REPLICATION_SCOPE_LOCAL) {
+                scopes.put(cf.getName(), cf.getScope());
+              }
+            }
+          }
+          // Skip the flush/compaction
           continue;
+
         }
       } else {
         family = CellUtil.cloneFamily(cell);

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 387427e..c376f18 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -18,6 +18,13 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+
 import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -44,9 +51,11 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -68,10 +77,6 @@ import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALKey;
 
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-
 /**
  * Class that handles the source of a replication stream.
  * Currently does not handle more than 1 slave
@@ -102,6 +107,8 @@ public class ReplicationSource extends Thread
   private ReplicationQueueInfo replicationQueueInfo;
   // id of the peer cluster this source replicates to
   private String peerId;
+
+  String actualPeerId;
   // The manager of all sources to which we ping back our progress
   private ReplicationSourceManager manager;
   // Should we stop everything?
@@ -185,6 +192,8 @@ public class ReplicationSource extends Thread
     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
     // ReplicationQueueInfo parses the peerId out of the znode for us
     this.peerId = this.replicationQueueInfo.getPeerId();
+    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+    this.actualPeerId = replicationQueueInfo.getPeerId();
     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
     this.replicationEndpoint = replicationEndpoint;
   }
@@ -507,6 +516,17 @@ public class ReplicationSource extends Thread
     // Current number of hfiles that we need to replicate
     private long currentNbHFiles = 0;
 
+    // Use guava cache to set ttl for each key
+    private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
+        .expireAfterAccess(1, TimeUnit.DAYS).build(
+            new CacheLoader<String, Boolean>() {
+              @Override
+              public Boolean load(String key) throws Exception {
+                return false;
+              }
+            }
+        );
+
     public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue,
         ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
       this.walGroupId = walGroupId;
@@ -587,9 +607,24 @@ public class ReplicationSource extends Thread
         currentNbOperations = 0;
         currentNbHFiles = 0;
         List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
+
+        Map<String, Long> lastPositionsForSerialScope = new HashMap<>();
         currentSize = 0;
         try {
-          if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
+          if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries,
+              lastPositionsForSerialScope)) {
+            for (Map.Entry<String, Long> entry : lastPositionsForSerialScope.entrySet()) {
+              waitingUntilCanPush(entry);
+            }
+            try {
+              MetaTableAccessor
+                  .updateReplicationPositions(manager.getConnection(), actualPeerId,
+                      lastPositionsForSerialScope);
+            } catch (IOException e) {
+              LOG.error("updateReplicationPositions fail", e);
+              stopper.stop("updateReplicationPositions fail");
+            }
+
             continue;
           }
         } catch (IOException ioe) {
@@ -625,15 +660,30 @@ public class ReplicationSource extends Thread
             LOG.warn("Unable to finalize the tailing of a file", e);
           }
         }
-
+        for(Map.Entry<String, Long> entry: lastPositionsForSerialScope.entrySet()) {
+          waitingUntilCanPush(entry);
+        }
         // If we didn't get anything to replicate, or if we hit a IOE,
         // wait a bit and retry.
         // But if we need to stop, don't bother sleeping
         if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
-            manager.logPositionAndCleanOldLogs(this.currentPath,
-                peerClusterZnode, this.repLogReader.getPosition(),
+
+            // Save positions to meta table before zk.
+            if (!gotIOE) {
+              try {
+                MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
+                    lastPositionsForSerialScope);
+              } catch (IOException e) {
+                LOG.error("updateReplicationPositions fail", e);
+                stopper.stop("updateReplicationPositions fail");
+              }
+            }
+
+            manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
+                this.repLogReader.getPosition(),
                 this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
+
             this.lastLoggedPosition = this.repLogReader.getPosition();
           }
           // Reset the sleep multiplier if nothing has actually gone wrong
@@ -648,8 +698,7 @@ public class ReplicationSource extends Thread
           }
           continue;
         }
-        sleepMultiplier = 1;
-        shipEdits(currentWALisBeingWrittenTo, entries);
+        shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope);
       }
       if (replicationQueueInfo.isQueueRecovered()) {
         // use synchronize to make sure one last thread will clean the queue
@@ -671,16 +720,42 @@ public class ReplicationSource extends Thread
       }
     }
 
+    private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
+      String key = entry.getKey();
+      long seq = entry.getValue();
+      boolean deleteKey = false;
+      if (seq <= 0) {
+        // There is a REGION_CLOSE marker, we can not continue skipping after this entry.
+        deleteKey = true;
+        seq = -seq;
+      }
+
+      if (!canSkipWaitingSet.getUnchecked(key)) {
+        try {
+          manager.waitUntilCanBePushed(Bytes.toBytes(key), seq, actualPeerId);
+        } catch (Exception e) {
+          LOG.error("waitUntilCanBePushed fail", e);
+          stopper.stop("waitUntilCanBePushed fail");
+        }
+        canSkipWaitingSet.put(key, true);
+      }
+      if (deleteKey) {
+        canSkipWaitingSet.invalidate(key);
+      }
+    }
+
     /**
      * Read all the entries from the current log files and retain those that need to be replicated.
      * Else, process the end of the current file.
      * @param currentWALisBeingWrittenTo is the current WAL being written to
      * @param entries resulting entries to be replicated
+     * @param lastPosition save the last sequenceid for each region if the table has
+     *                     serial-replication scope
      * @return true if we got nothing and went to the next file, false if we got entries
      * @throws IOException
      */
     protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
-        List<WAL.Entry> entries) throws IOException {
+        List<WAL.Entry> entries, Map<String, Long> lastPosition) throws IOException {
       long seenEntries = 0;
       if (LOG.isTraceEnabled()) {
         LOG.trace("Seeking in " + this.currentPath + " at position "
@@ -693,6 +768,26 @@ public class ReplicationSource extends Thread
         metrics.incrLogEditsRead();
         seenEntries++;
 
+        if (entry.hasSerialReplicationScope()) {
+          String key = Bytes.toString(entry.getKey().getEncodedRegionName());
+          lastPosition.put(key, entry.getKey().getLogSeqNum());
+          if (entry.getEdit().getCells().size() > 0) {
+            WALProtos.RegionEventDescriptor maybeEvent =
+                WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
+            if (maybeEvent != null && maybeEvent.getEventType()
+                == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
+              // In serially replication, if we move a region to another RS and move it back, we may
+              // read logs crossing two sections. We should break at REGION_CLOSE and push the first
+              // section first in case of missing the middle section belonging to the other RS.
+              // In a worker thread, if we can push the first log of a region, we can push all logs
+              // in the same region without waiting until we read a close marker because next time
+              // we read logs in this region, it must be a new section and not adjacent with this
+              // region. Mark it negative.
+              lastPosition.put(key, -entry.getKey().getLogSeqNum());
+              break;
+            }
+          }
+        }
         // don't replicate if the log entries have already been consumed by the cluster
         if (replicationEndpoint.canReplicateToSameCluster()
             || !entry.getKey().getClusterIds().contains(peerClusterId)) {
@@ -722,6 +817,7 @@ public class ReplicationSource extends Thread
             || entries.size() >= replicationQueueNbCapacity) {
           break;
         }
+
         try {
           entry = this.repLogReader.readNextAndSetPosition();
         } catch (IOException ie) {
@@ -996,7 +1092,8 @@ public class ReplicationSource extends Thread
      * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
      * written to when this method was called
      */
-    protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
+    protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries,
+        Map<String, Long> lastPositionsForSerialScope) {
       int sleepMultiplier = 0;
       if (entries.isEmpty()) {
         LOG.warn("Was given 0 edits to ship");
@@ -1047,6 +1144,16 @@ public class ReplicationSource extends Thread
             for (int i = 0; i < size; i++) {
               cleanUpHFileRefs(entries.get(i).getEdit());
             }
+
+            // Save positions to meta table before zk.
+            try {
+              MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
+                  lastPositionsForSerialScope);
+            } catch (IOException e) {
+              LOG.error("updateReplicationPositions fail", e);
+              stopper.stop("updateReplicationPositions fail");
+            }
+
             //Log and clean up WAL logs
             manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
               this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 381b024..d4f6d9d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -48,10 +49,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -64,6 +68,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
 
 /**
@@ -118,6 +123,8 @@ public class ReplicationSourceManager implements ReplicationListener {
   private final Random rand;
   private final boolean replicationForBulkLoadDataEnabled;
 
+  private Connection connection;
+  private long replicationWaitTime;
 
   /**
    * Creates a replication manager and sets the watch on all the other registered region servers
@@ -134,7 +141,7 @@ public class ReplicationSourceManager implements ReplicationListener {
   public ReplicationSourceManager(final ReplicationQueues replicationQueues,
       final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
       final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
-      final Path oldLogDir, final UUID clusterId) {
+      final Path oldLogDir, final UUID clusterId) throws IOException {
     //CopyOnWriteArrayList is thread-safe.
     //Generally, reading is more than modifying.
     this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
@@ -171,6 +178,9 @@ public class ReplicationSourceManager implements ReplicationListener {
     replicationForBulkLoadDataEnabled =
         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
           HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+    this.replicationWaitTime = conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY,
+          HConstants.REPLICATION_SERIALLY_WAITING_DEFAULT);
+    connection = ConnectionFactory.createConnection(conf);
   }
 
   /**
@@ -778,6 +788,10 @@ public class ReplicationSourceManager implements ReplicationListener {
    */
   public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
 
+  public Connection getConnection() {
+    return this.connection;
+  }
+
   /**
    * Get a string representation of all the sources' metrics
    */
@@ -804,4 +818,75 @@ public class ReplicationSourceManager implements ReplicationListener {
   public void cleanUpHFileRefs(String peerId, List<String> files) {
     this.replicationQueues.removeHFileRefs(peerId, files);
   }
+
+  /**
+   * Whether an entry can be pushed to the peer or not right now.
+   * If we enable serial replication, we can not push the entry until all entries in its region
+   * whose sequence numbers are smaller than this entry have been pushed.
+   * For each ReplicationSource, we need only check the first entry in each region, as long as it
+   * can be pushed, we can push all in this ReplicationSource.
+   * This method will be blocked until we can push.
+   * @return the first barrier of entry's region, or -1 if there is no barrier. It is used to
+   *         prevent saving positions in the region of no barrier.
+   */
+  void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId)
+      throws IOException, InterruptedException {
+
+    /**
+     * There are barriers for this region and position for this peer. N barriers form N intervals,
+     * (b1,b2) (b2,b3) ... (bn,max). Generally, there is no logs whose seq id is not greater than
+     * the first barrier and the last interval is start from the last barrier.
+     *
+     * There are several conditions that we can push now, otherwise we should block:
+     * 1) "Serial replication" is not enabled, we can push all logs just like before. This case
+     *    should not call this method.
+     * 2) There is no barriers for this region, or the seq id is smaller than the first barrier.
+     *    It is mainly because we alter REPLICATION_SCOPE = 2. We can not guarantee the
+     *    order of logs that is written before altering.
+     * 3) This entry is in the first interval of barriers. We can push them because it is the
+     *    start of a region. Splitting/merging regions are also ok because the first section of
+     *    daughter region is in same region of parents and the order in one RS is guaranteed.
+     * 4) If the entry's seq id and the position are in same section, or the pos is the last
+     *    number of previous section. Because when open a region we put a barrier the number
+     *    is the last log's id + 1.
+     * 5) Log's seq is smaller than pos in meta, we are retrying. It may happen when a RS crashes
+     *    after save replication meta and before save zk offset.
+     */
+    List<Long> barriers = MetaTableAccessor.getReplicationBarriers(connection, encodedName);
+    if (barriers.isEmpty() || seq <= barriers.get(0)) {
+      // Case 2
+      return;
+    }
+    int interval = Collections.binarySearch(barriers, seq);
+    if (interval < 0) {
+      interval = -interval - 1;// get the insert position if negative
+    }
+    if (interval == 1) {
+      // Case 3
+      return;
+    }
+
+    while (true) {
+      long pos = MetaTableAccessor.getReplicationPositionForOnePeer(connection, encodedName, peerId);
+      if (seq <= pos) {
+        // Case 5
+      }
+      if (pos >= 0) {
+        // Case 4
+        int posInterval = Collections.binarySearch(barriers, pos);
+        if (posInterval < 0) {
+          posInterval = -posInterval - 1;// get the insert position if negative
+        }
+        if (posInterval == interval || pos == barriers.get(interval - 1) - 1) {
+          return;
+        }
+      }
+
+      LOG.info(Bytes.toString(encodedName) + " can not start pushing to peer " + peerId
+          + " because previous log has not been pushed: sequence=" + seq + " pos=" + pos
+          + " barriers=" + Arrays.toString(barriers.toArray()));
+      Thread.sleep(replicationWaitTime);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index d2b336e..76b19f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -21,8 +21,11 @@ package org.apache.hadoop.hbase.wal;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -35,6 +38,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -269,6 +273,18 @@ public interface WAL {
       key.setCompressionContext(compressionContext);
     }
 
+    public boolean hasSerialReplicationScope () {
+      if (getKey().getScopes() == null || getKey().getScopes().isEmpty()) {
+        return false;
+      }
+      for (Map.Entry<byte[], Integer> e:getKey().getScopes().entrySet()) {
+        if (e.getValue() == HConstants.REPLICATION_SCOPE_SERIAL){
+          return true;
+        }
+      }
+      return false;
+    }
+
     @Override
     public String toString() {
       return this.key + "=" + this.edit;

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index 3955a4c..5857f5d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -443,7 +443,7 @@ public class TestMetaTableAccessor {
       List<HRegionInfo> regionInfos = Lists.newArrayList(parent);
       MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
 
-      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
+      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false);
 
       assertEmptyMetaLocation(meta, splitA.getRegionName(), 1);
       assertEmptyMetaLocation(meta, splitA.getRegionName(), 2);
@@ -472,7 +472,7 @@ public class TestMetaTableAccessor {
       MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
 
       MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3,
-          HConstants.LATEST_TIMESTAMP);
+          HConstants.LATEST_TIMESTAMP, false);
 
       assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
       assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
@@ -556,7 +556,7 @@ public class TestMetaTableAccessor {
 
       // now merge the regions, effectively deleting the rows for region a and b.
       MetaTableAccessor.mergeRegions(connection, mergedRegionInfo,
-        regionInfoA, regionInfoB, sn, 1, masterSystemTime);
+        regionInfoA, regionInfoB, sn, 1, masterSystemTime, false);
 
       result = meta.get(get);
       serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
@@ -639,7 +639,7 @@ public class TestMetaTableAccessor {
       }
       SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler();
       long prevCalls = scheduler.numPriorityCalls;
-      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1);
+      MetaTableAccessor.splitRegion(connection, parent, splitA, splitB,loc.getServerName(),1,false);
 
       assertTrue(prevCalls < scheduler.numPriorityCalls);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
index bff9c78..bead7e9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaScanner.java
@@ -165,7 +165,7 @@ public class TestMetaScanner {
               end);
 
             MetaTableAccessor.splitRegion(connection,
-              parent, splita, splitb, ServerName.valueOf("fooserver", 1, 0), 1);
+              parent, splita, splitb, ServerName.valueOf("fooserver", 1, 0), 1, false);
 
             Threads.sleep(random.nextInt(200));
           } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/441bc050/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
index 7be46f5..4843155 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
@@ -1214,7 +1214,7 @@ public class TestAssignmentManagerOnCluster {
     }
     conf.setInt("hbase.regionstatestore.meta.connection", 3);
     final RegionStateStore rss =
-        new RegionStateStore(new MyRegionServer(conf, new ZkCoordinatedStateManager()));
+        new RegionStateStore(new MyMaster(conf, new ZkCoordinatedStateManager()));
     rss.start();
     // Create 10 threads and make each do 10 puts related to region state update
     Thread[] th = new Thread[10];


Mime
View raw message