hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenh...@apache.org
Subject hbase git commit: HBASE-16086 TableCfWALEntryFilter and ScopeWALEntryFilter should not redundantly iterate over cells (Vincent Poon)
Date Sun, 11 Sep 2016 02:11:58 GMT
Repository: hbase
Updated Branches:
  refs/heads/master cc2a40a78 -> 80d8b2100


HBASE-16086 TableCfWALEntryFilter and ScopeWALEntryFilter should not redundantly iterate over
cells (Vincent Poon)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/80d8b210
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/80d8b210
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/80d8b210

Branch: refs/heads/master
Commit: 80d8b2100d9f4dc2a01ea6bdbded6ec52d7e4263
Parents: cc2a40a
Author: chenheng <chenheng@apache.org>
Authored: Sun Sep 11 09:55:08 2016 +0800
Committer: chenheng <chenheng@apache.org>
Committed: Sun Sep 11 09:55:08 2016 +0800

----------------------------------------------------------------------
 .../hbase/replication/BulkLoadCellFilter.java   |  81 ++++++++++++
 .../hbase/replication/ChainWALEntryFilter.java  |  38 +++++-
 .../hbase/replication/ScopeWALEntryFilter.java  |  94 ++++----------
 .../replication/TableCfWALEntryFilter.java      | 124 +++++++------------
 .../hadoop/hbase/replication/WALCellFilter.java |  41 ++++++
 .../TestReplicationWALEntryFilters.java         |  12 +-
 6 files changed, 231 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BulkLoadCellFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BulkLoadCellFilter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BulkLoadCellFilter.java
new file mode 100644
index 0000000..3599d10
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BulkLoadCellFilter.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+import com.google.common.base.Predicate;
+
+public class BulkLoadCellFilter {
+  private static final Log LOG = LogFactory.getLog(BulkLoadCellFilter.class);
+
+  /**
+   * Filters the bulk load cell using the supplied predicate.
+   * @param cell The WAL cell to filter.
+   * @param famPredicate Returns true of given family should be removed.
+   * @return The filtered cell.
+   */
+  public Cell filterCell(Cell cell, Predicate<byte[]> famPredicate) {
+    byte[] fam;
+    BulkLoadDescriptor bld = null;
+    try {
+      bld = WALEdit.getBulkLoadDescriptor(cell);
+    } catch (IOException e) {
+      LOG.warn("Failed to get bulk load events information from the WAL file.", e);
+      return cell;
+    }
+    List<StoreDescriptor> storesList = bld.getStoresList();
+    // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
+    List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
+    Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
+    boolean anyStoreRemoved = false;
+    while (copiedStoresListIterator.hasNext()) {
+      StoreDescriptor sd = copiedStoresListIterator.next();
+      fam = sd.getFamilyName().toByteArray();
+      if (famPredicate.apply(fam)) {
+        copiedStoresListIterator.remove();
+        anyStoreRemoved = true;
+      }
+    }
+
+    if (!anyStoreRemoved) {
+      return cell;
+    } else if (copiedStoresList.isEmpty()) {
+      return null;
+    }
+    BulkLoadDescriptor.Builder newDesc =
+        BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
+            .setEncodedRegionName(bld.getEncodedRegionName())
+            .setBulkloadSeqNum(bld.getBulkloadSeqNum());
+    newDesc.addAllStores(copiedStoresList);
+    BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
+    return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
+        cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
index 6a3981a..1d67faa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
@@ -34,9 +35,11 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
 public class ChainWALEntryFilter implements WALEntryFilter {
 
   private final WALEntryFilter[] filters;
+  private WALCellFilter[] cellFilters;
 
   public ChainWALEntryFilter(WALEntryFilter...filters) {
     this.filters = filters;
+    initCellFilters();
   }
 
   public ChainWALEntryFilter(List<WALEntryFilter> filters) {
@@ -49,8 +52,18 @@ public class ChainWALEntryFilter implements WALEntryFilter {
         rawFilters.add(filter);
       }
     }
-
     this.filters = rawFilters.toArray(new WALEntryFilter[rawFilters.size()]);
+    initCellFilters();
+  }
+
+  public void initCellFilters() {
+    ArrayList<WALCellFilter> cellFilters = new ArrayList<>(filters.length);
+    for (WALEntryFilter filter : filters) {
+      if (filter instanceof WALCellFilter) {
+        cellFilters.add((WALCellFilter) filter);
+      }
+    }
+    this.cellFilters = cellFilters.toArray(new WALCellFilter[cellFilters.size()]);
   }
 
   @Override
@@ -61,7 +74,30 @@ public class ChainWALEntryFilter implements WALEntryFilter {
       }
       entry = filter.filter(entry);
     }
+    filterCells(entry);
     return entry;
   }
 
+  private void filterCells(Entry entry) {
+    if (entry == null || cellFilters.length == 0) {
+      return;
+    }
+    ArrayList<Cell> cells = entry.getEdit().getCells();
+    int size = cells.size();
+    for (int i = size - 1; i >= 0; i--) {
+      Cell cell = cells.get(i);
+      for (WALCellFilter filter : cellFilters) {
+        cell = filter.filterCell(entry, cell);
+        if (cell != null) {
+          cells.set(i, cell);
+        } else {
+          cells.remove(i);
+          break;
+        }
+      }
+    }
+    if (cells.size() < size / 2) {
+      cells.trimToSize();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index 28a83dd..b084a04 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -18,29 +18,24 @@
 
 package org.apache.hadoop.hbase.replication;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
 import java.util.NavigableMap;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
+import com.google.common.base.Predicate;
+
 /**
  * Keeps KVs that are scoped other than local
  */
 @InterfaceAudience.Private
-public class ScopeWALEntryFilter implements WALEntryFilter {
-  private static final Log LOG = LogFactory.getLog(ScopeWALEntryFilter.class);
+public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter {
+
+  BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
 
   @Override
   public Entry filter(Entry entry) {
@@ -48,72 +43,27 @@ public class ScopeWALEntryFilter implements WALEntryFilter {
     if (scopes == null || scopes.isEmpty()) {
       return null;
     }
-    ArrayList<Cell> cells = entry.getEdit().getCells();
-    int size = cells.size();
-    byte[] fam;
-    for (int i = size - 1; i >= 0; i--) {
-      Cell cell = cells.get(i);
-      // If a bulk load entry has a scope then that means user has enabled replication for
bulk load
-      // hfiles.
-      // TODO There is a similar logic in TableCfWALEntryFilter but data structures are different
so
-      // cannot refactor into one now, can revisit and see if any way to unify them.
+    return entry;
+  }
+
+  @Override
+  public Cell filterCell(Entry entry, Cell cell) {
+    final NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
+      // The scope will be null or empty if
+      // there's nothing to replicate in that WALEdit
+      byte[] fam = CellUtil.cloneFamily(cell);
       if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
-        Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(scopes, cell);
-        if (filteredBulkLoadEntryCell != null) {
-          cells.set(i, filteredBulkLoadEntryCell);
-        } else {
-          cells.remove(i);
-        }
+        cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
+          @Override
+          public boolean apply(byte[] fam) {
+            return !scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL;
+          }
+        });
       } else {
-        // The scope will be null or empty if
-        // there's nothing to replicate in that WALEdit
-        fam = CellUtil.cloneFamily(cell);
         if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL)
{
-          cells.remove(i);
+          return null;
         }
       }
-    }
-    if (cells.size() < size / 2) {
-      cells.trimToSize();
-    }
-    return entry;
-  }
-
-  private Cell filterBulkLoadEntries(NavigableMap<byte[], Integer> scopes, Cell cell)
{
-    byte[] fam;
-    BulkLoadDescriptor bld = null;
-    try {
-      bld = WALEdit.getBulkLoadDescriptor(cell);
-    } catch (IOException e) {
-      LOG.warn("Failed to get bulk load events information from the WAL file.", e);
-      return cell;
-    }
-    List<StoreDescriptor> storesList = bld.getStoresList();
-    // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
-    List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
-    Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
-    boolean anyStoreRemoved = false;
-    while (copiedStoresListIterator.hasNext()) {
-      StoreDescriptor sd = copiedStoresListIterator.next();
-      fam = sd.getFamilyName().toByteArray();
-      if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL)
{
-        copiedStoresListIterator.remove();
-        anyStoreRemoved = true;
-      }
-    }
-
-    if (!anyStoreRemoved) {
-      return cell;
-    } else if (copiedStoresList.isEmpty()) {
-      return null;
-    }
-    BulkLoadDescriptor.Builder newDesc =
-        BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
-            .setEncodedRegionName(bld.getEncodedRegionName())
-            .setBulkloadSeqNum(bld.getBulkloadSeqNum());
-    newDesc.addAllStores(copiedStoresList);
-    BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
-    return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
-      cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
+    return cell;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
index f10849b..d890e3e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
@@ -18,9 +18,6 @@
 
 package org.apache.hadoop.hbase.replication;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -29,16 +26,17 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
-public class TableCfWALEntryFilter implements WALEntryFilter {
+import com.google.common.base.Predicate;
+
+public class TableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
 
   private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class);
-  private final ReplicationPeer peer;
+  private ReplicationPeer peer;
+  private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
 
   public TableCfWALEntryFilter(ReplicationPeer peer) {
     this.peer = peer;
@@ -47,91 +45,57 @@ public class TableCfWALEntryFilter implements WALEntryFilter {
   @Override
   public Entry filter(Entry entry) {
     TableName tabName = entry.getKey().getTablename();
-    ArrayList<Cell> cells = entry.getEdit().getCells();
-    Map<TableName, List<String>> tableCFs = null;
-
-    try {
-      tableCFs = this.peer.getTableCFs();
-    } catch (IllegalArgumentException e) {
-      LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
-          ", degenerate as if it's not configured by keeping tableCFs==null");
-    }
-    int size = cells.size();
+    Map<TableName, List<String>> tableCFs = getTableCfs();
 
     // If null means user has explicitly not configured any table CFs so all the tables data
are
     // applicable for replication
-    if (tableCFs == null) {
-      return entry;
-    }
-    // return null(prevent replicating) if logKey's table isn't in this peer's
-    // replicable table list
+    if (tableCFs == null) return entry;
+
     if (!tableCFs.containsKey(tabName)) {
       return null;
-    } else {
-      List<String> cfs = tableCFs.get(tabName);
-      for (int i = size - 1; i >= 0; i--) {
-        Cell cell = cells.get(i);
-        // TODO There is a similar logic in ScopeWALEntryFilter but data structures are different
so
-        // cannot refactor into one now, can revisit and see if any way to unify them.
-        // Filter bulk load entries separately
-        if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
-          Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(cfs, cell);
-          if (filteredBulkLoadEntryCell != null) {
-            cells.set(i, filteredBulkLoadEntryCell);
-          } else {
-            cells.remove(i);
-          }
-        } else {
-          // ignore(remove) kv if its cf isn't in the replicable cf list
-          // (empty cfs means all cfs of this table are replicable)
-          if ((cfs != null) && !cfs.contains(Bytes.toString(cell.getFamilyArray(),
-            cell.getFamilyOffset(), cell.getFamilyLength()))) {
-            cells.remove(i);
-          }
-        }
-      }
-    }
-    if (cells.size() < size/2) {
-      cells.trimToSize();
     }
+
     return entry;
   }
 
-  private Cell filterBulkLoadEntries(List<String> cfs, Cell cell) {
-    byte[] fam;
-    BulkLoadDescriptor bld = null;
-    try {
-      bld = WALEdit.getBulkLoadDescriptor(cell);
-    } catch (IOException e) {
-      LOG.warn("Failed to get bulk load events information from the WAL file.", e);
-      return cell;
-    }
-    List<StoreDescriptor> storesList = bld.getStoresList();
-    // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
-    List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
-    Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
-    boolean anyStoreRemoved = false;
-    while (copiedStoresListIterator.hasNext()) {
-      StoreDescriptor sd = copiedStoresListIterator.next();
-      fam = sd.getFamilyName().toByteArray();
-      if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
-        copiedStoresListIterator.remove();
-        anyStoreRemoved = true;
+  @Override
+  public Cell filterCell(final Entry entry, Cell cell) {
+    final Map<TableName, List<String>> tableCfs = getTableCfs();
+    if (tableCfs == null) return cell;
+    TableName tabName = entry.getKey().getTablename();
+    List<String> cfs = tableCfs.get(tabName);
+    // ignore(remove) kv if its cf isn't in the replicable cf list
+    // (empty cfs means all cfs of this table are replicable)
+    if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+      cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
+        @Override
+        public boolean apply(byte[] fam) {
+          if (tableCfs != null) {
+            List<String> cfs = tableCfs.get(entry.getKey().getTablename());
+            if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
+              return true;
+            }
+          }
+          return false;
+        }
+      });
+    } else {
+      if ((cfs != null) && !cfs.contains(
+        Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())))
{
+        return null;
       }
     }
+    return cell;
+  }
 
-    if (!anyStoreRemoved) {
-      return cell;
-    } else if (copiedStoresList.isEmpty()) {
-      return null;
+  Map<TableName, List<String>> getTableCfs() {
+    Map<TableName, List<String>> tableCFs = null;
+    try {
+      tableCFs = this.peer.getTableCFs();
+    } catch (IllegalArgumentException e) {
+      LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
+          ", degenerate as if it's not configured by keeping tableCFs==null");
     }
-    BulkLoadDescriptor.Builder newDesc =
-        BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
-            .setEncodedRegionName(bld.getEncodedRegionName())
-            .setBulkloadSeqNum(bld.getBulkloadSeqNum());
-    newDesc.addAllStores(copiedStoresList);
-    BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
-    return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
-      cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
+    return tableCFs;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java
new file mode 100644
index 0000000..78b3ed4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALCellFilter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+/**
+ * A filter for WAL entry cells before being sent over to replication.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface WALCellFilter {
+
+  /**
+   * Applies the filter, possibly returning a different Cell instance.
+   * If null is returned, the cell will be skipped.
+   * @param entry Entry which contains the cell
+   * @param cell Cell to filter
+   * @return a (possibly modified) Cell to use.  Returning null will cause the cell
+   * to be skipped for replication.
+   */
+  public Cell filterCell(Entry entry, Cell cell);
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/80d8b210/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
index c906d6a..04d9232 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -78,7 +78,7 @@ public class TestReplicationWALEntryFilters {
 
   @Test
   public void testScopeWALEntryFilter() {
-    ScopeWALEntryFilter filter = new ScopeWALEntryFilter();
+    WALEntryFilter filter = new ChainWALEntryFilter(new ScopeWALEntryFilter());
 
     Entry userEntry = createEntry(null, a, b);
     Entry userEntryA = createEntry(null, a);
@@ -201,14 +201,14 @@ public class TestReplicationWALEntryFilters {
 
     when(peer.getTableCFs()).thenReturn(null);
     Entry userEntry = createEntry(null, a, b, c);
-    TableCfWALEntryFilter filter = new TableCfWALEntryFilter(peer);
+    WALEntryFilter filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
     assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
 
     // empty map
     userEntry = createEntry(null, a, b, c);
     Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
     when(peer.getTableCFs()).thenReturn(tableCfs);
-    filter = new TableCfWALEntryFilter(peer);
+    filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
     assertEquals(null, filter.filter(userEntry));
 
     // table bar
@@ -216,7 +216,7 @@ public class TestReplicationWALEntryFilters {
     tableCfs = new HashMap<TableName, List<String>>();
     tableCfs.put(TableName.valueOf("bar"), null);
     when(peer.getTableCFs()).thenReturn(tableCfs);
-    filter = new TableCfWALEntryFilter(peer);
+    filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
     assertEquals(null, filter.filter(userEntry));
 
     // table foo:a
@@ -224,7 +224,7 @@ public class TestReplicationWALEntryFilters {
     tableCfs = new HashMap<TableName, List<String>>();
     tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
     when(peer.getTableCFs()).thenReturn(tableCfs);
-    filter = new TableCfWALEntryFilter(peer);
+    filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
     assertEquals(createEntry(null, a), filter.filter(userEntry));
 
     // table foo:a,c
@@ -232,7 +232,7 @@ public class TestReplicationWALEntryFilters {
     tableCfs = new HashMap<TableName, List<String>>();
     tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
     when(peer.getTableCFs()).thenReturn(tableCfs);
-    filter = new TableCfWALEntryFilter(peer);
+    filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
     assertEquals(createEntry(null, a,c), filter.filter(userEntry));
   }
 


Mime
View raw message