hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ashishsin...@apache.org
Subject hbase git commit: HBASE-17290 Potential loss of data for replication of bulk loaded hfiles
Date Fri, 06 Jan 2017 11:26:40 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 629b04f44 -> 5f631b965


HBASE-17290 Potential loss of data for replication of bulk loaded hfiles


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5f631b96
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5f631b96
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5f631b96

Branch: refs/heads/master
Commit: 5f631b9653a4bf86a2bebed58abed747c04b704f
Parents: 629b04f
Author: Ashish Singhi <ashishsinghi@apache.org>
Authored: Fri Jan 6 16:15:49 2017 +0530
Committer: Ashish Singhi <ashishsinghi@apache.org>
Committed: Fri Jan 6 16:18:20 2017 +0530

----------------------------------------------------------------------
 .../hbase/replication/ReplicationQueues.java    |  6 +-
 .../replication/ReplicationQueuesZKImpl.java    | 11 ++--
 .../TableBasedReplicationQueuesImpl.java        |  4 +-
 .../hbase/regionserver/HRegionServer.java       |  4 ++
 .../regionserver/HFileReplicator.java           |  2 +-
 .../replication/regionserver/Replication.java   | 55 +++++++----------
 .../regionserver/ReplicationObserver.java       | 62 ++++++++++++++++++++
 .../regionserver/ReplicationSource.java         | 11 ++--
 .../ReplicationSourceInterface.java             |  6 +-
 .../regionserver/ReplicationSourceManager.java  |  4 +-
 .../cleaner/TestReplicationHFileCleaner.java    |  9 +--
 .../replication/ReplicationSourceDummy.java     |  3 +-
 .../replication/TestReplicationStateBasic.java  | 33 ++++++-----
 13 files changed, 140 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index 0ae27d0..be5a590 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication;
 import java.util.List;
 import java.util.SortedSet;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Pair;
 
@@ -144,10 +145,11 @@ public interface ReplicationQueues {
   /**
    * Add new hfile references to the queue.
    * @param peerId peer cluster id to which the hfiles need to be replicated
-   * @param files list of hfile references to be added
+   * @param pairs list of pairs of { HFile location in staging dir, HFile path in region
dir which
+   *          will be added in the queue }
    * @throws ReplicationException if fails to add a hfile reference
    */
-  void addHFileRefs(String peerId, List<String> files) throws ReplicationException;
+  void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException;
 
   /**
    * Remove hfile references from the queue.

http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 7c548d9..1de1315 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -319,16 +320,18 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase
implements R
   }
 
   @Override
-  public void addHFileRefs(String peerId, List<String> files) throws ReplicationException
{
+  public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
+      throws ReplicationException {
     String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
     boolean debugEnabled = LOG.isDebugEnabled();
     if (debugEnabled) {
-      LOG.debug("Adding hfile references " + files + " in queue " + peerZnode);
+      LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode);
     }
     List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
-    int size = files.size();
+    int size = pairs.size();
     for (int i = 0; i < size; i++) {
-      listOfOps.add(ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)),
+      listOfOps.add(ZKUtilOp.createAndFailSilent(
+        ZKUtil.joinZNode(peerZnode, pairs.get(i).getSecond().getName()),
         HConstants.EMPTY_BYTE_ARRAY));
     }
     if (debugEnabled) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
index 28b9bdf..1023e0d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesImpl.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
@@ -307,7 +308,8 @@ public class TableBasedReplicationQueuesImpl extends ReplicationTableBase
   }
 
   @Override
-  public void addHFileRefs(String peerId, List<String> files) throws ReplicationException
{
+  public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
+      throws ReplicationException {
     // TODO
     throw new NotImplementedException();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 853d699..3c9d54f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -125,6 +125,7 @@ import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFa
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.UserProvider;
@@ -524,6 +525,9 @@ public class HRegionServer extends HasThread implements
     checkCodecs(this.conf);
     this.userProvider = UserProvider.instantiate(conf);
     FSUtils.setupShortCircuitRead(this.conf);
+
+    Replication.decorateRegionServerConfiguration(this.conf);
+
     // Disable usage of meta replicas in the regionserver
     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
index 256c24c..35aa1fb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -380,7 +380,7 @@ public class HFileReplicator {
           } catch (FileNotFoundException e1) {
             // This will mean that the hfile does not exists any where in source cluster
FS. So we
             // cannot do anything here just log and continue.
-            LOG.error("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
+            LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
                 + ". Hence ignoring this hfile from replication..",
               e1);
             continue;

http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 5f87690..d3f9ba2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.NavigableMap;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -32,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -44,8 +44,7 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
 import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -236,34 +235,6 @@ public class Replication extends WALActionsListener.Base implements
     scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager());
   }
 
-  @Override
-  public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey,
-      final WALEdit edit) throws IOException {
-    NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
-    if (this.replicationForBulkLoadData && scopes != null && !scopes.isEmpty())
{
-      TableName tableName = logKey.getTablename();
-      for (Cell c : edit.getCells()) {
-        // Only check for bulk load events
-        if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) {
-          BulkLoadDescriptor bld = null;
-          try {
-            bld = WALEdit.getBulkLoadDescriptor(c);
-          } catch (IOException e) {
-            LOG.error("Failed to get bulk load events information from the wal file.", e);
-            throw e;
-          }
-
-          for (StoreDescriptor s : bld.getStoresList()) {
-            byte[] fam = s.getFamilyName().toByteArray();
-            if (scopes.containsKey(fam)) {
-              addHFileRefsToQueue(this.getReplicationManager(), tableName, fam, s);
-            }
-          }
-        }
-      }
-    }
-  }
-
   /**
    * Utility method used to set the correct scopes on each log key. Doesn't set a scope on
keys from
    * compaction WAL edits and if the scope is local.
@@ -298,10 +269,10 @@ public class Replication extends WALActionsListener.Base implements
     }
   }
 
-  private static void addHFileRefsToQueue(ReplicationSourceManager replicationManager,
-      TableName tableName, byte[] family, StoreDescriptor s) throws IOException {
+  void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>>
pairs)
+      throws IOException {
     try {
-      replicationManager.addHFileRefs(tableName, family, s.getStoreFileList());
+      this.replicationManager.addHFileRefs(tableName, family, pairs);
     } catch (ReplicationException e) {
       LOG.error("Failed to add hfile references in the replication queue.", e);
       throw new IOException(e);
@@ -337,6 +308,22 @@ public class Replication extends WALActionsListener.Base implements
     }
   }
 
+  /**
+   * This method modifies the region server's configuration in order to inject replication-related
+   * features
+   * @param conf region server configurations
+   */
+  public static void decorateRegionServerConfiguration(Configuration conf) {
+    if (isReplicationForBulkLoadDataEnabled(conf)) {
+      String plugins = conf.get(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, "");
+      String rsCoprocessorClass = ReplicationObserver.class.getCanonicalName();
+      if (!plugins.contains(rsCoprocessorClass)) {
+        conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
+          plugins + "," + rsCoprocessorClass);
+      }
+    }
+  }
+
   /*
    * Statistics thread. Periodically prints the cache statistics to the log.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
new file mode 100644
index 0000000..03046b4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * An Observer to facilitate replication operations
+ */
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class ReplicationObserver extends BaseRegionObserver {
+  private static final Log LOG = LogFactory.getLog(ReplicationObserver.class);
+
+  @Override
+  public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment>
ctx,
+      final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException
{
+    RegionCoprocessorEnvironment env = ctx.getEnvironment();
+    Configuration c = env.getConfiguration();
+    if (pairs == null || pairs.isEmpty()
+        || !c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+          HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
+      LOG.debug("Skipping recording bulk load entries in preCommitStoreFile for bulkloaded
"
+          + "data replication.");
+      return;
+    }
+    HRegionServer rs = (HRegionServer) env.getRegionServerServices();
+    Replication rep = (Replication) rs.getReplicationSourceService();
+    rep.addHFileRefsToQueue(env.getRegionInfo().getTable(), family, pairs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 3eeb4b8..7a229eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -253,7 +254,7 @@ public class ReplicationSource extends Thread
   }
 
   @Override
-  public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>>
pairs)
       throws ReplicationException {
     String peerId = peerClusterZnode;
     if (peerId.contains("-")) {
@@ -266,8 +267,8 @@ public class ReplicationSource extends Thread
       List<String> tableCfs = tableCFMap.get(tableName);
       if (tableCFMap.containsKey(tableName)
           && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
-        this.replicationQueues.addHFileRefs(peerId, files);
-        metrics.incrSizeOfHFileRefsQueue(files.size());
+        this.replicationQueues.addHFileRefs(peerId, pairs);
+        metrics.incrSizeOfHFileRefsQueue(pairs.size());
       } else {
         LOG.debug("HFiles will not be replicated belonging to the table " + tableName + "
family "
             + Bytes.toString(family) + " to peer id " + peerId);
@@ -275,8 +276,8 @@ public class ReplicationSource extends Thread
     } else {
       // user has explicitly not defined any table cfs for replication, means replicate all
the
       // data
-      this.replicationQueues.addHFileRefs(peerId, files);
-      metrics.incrSizeOfHFileRefsQueue(files.size());
+      this.replicationQueues.addHFileRefs(peerId, pairs);
+      metrics.incrSizeOfHFileRefsQueue(pairs.size());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 7f4a9f7..8d5451c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Interface that defines a replication source
@@ -112,10 +113,11 @@ public interface ReplicationSourceInterface {
    * Add hfile names to the queue to be replicated.
    * @param tableName Name of the table these files belongs to
    * @param family Name of the family these files belong to
-   * @param files files whose names needs to be added to the queue to be replicated
+   * @param pairs list of pairs of { HFile location in staging dir, HFile path in region
dir which
+   *          will be added in the queue for replication}
    * @throws ReplicationException If failed to add hfile references
    */
-  void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+  void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>>
pairs)
       throws ReplicationException;
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index ef4093e..5b574da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -846,10 +846,10 @@ public class ReplicationSourceManager implements ReplicationListener
{
     return stats.toString();
   }
 
-  public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>>
pairs)
       throws ReplicationException {
     for (ReplicationSourceInterface source : this.sources) {
-      source.addHFileRefs(tableName, family, files);
+      source.addHFileRefs(tableName, family, pairs);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index fc3e516..817cfb4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -138,8 +139,8 @@ public class TestReplicationHFileCleaner {
         + "for it in the queue.",
       cleaner.isFileDeletable(fs.getFileStatus(file)));
 
-    List<String> files = new ArrayList<String>(1);
-    files.add(file.getName());
+    List<Pair<Path, Path>> files = new ArrayList<>(1);
+    files.add(new Pair<Path, Path>(null, file));
     // 4. Add the file to hfile-refs queue
     rq.addHFileRefs(peerId, files);
     // 5. Assert file should not be deletable
@@ -166,8 +167,8 @@ public class TestReplicationHFileCleaner {
     f.setPath(notDeletablefile);
     files.add(f);
 
-    List<String> hfiles = new ArrayList<>(1);
-    hfiles.add(notDeletablefile.getName());
+    List<Pair<Path, Path>> hfiles = new ArrayList<>(1);
+    hfiles.add(new Pair<Path, Path>(null, notDeletablefile));
     // 2. Add one file to hfile-refs queue
     rq.addHFileRefs(peerId, hfiles);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index abe484e..57e54d7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Source that does nothing at all, helpful to test ReplicationSourceManager
@@ -93,7 +94,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface
{
   }
 
   @Override
-  public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+  public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>>
files)
       throws ReplicationException {
     return;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5f631b96/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index fcab105..f8be9a7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -25,7 +25,9 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.zookeeper.KeeperException;
 import org.junit.Before;
@@ -202,10 +204,10 @@ public abstract class TestReplicationStateBasic {
     rq1.init(server1);
     rqc.init();
 
-    List<String> files1 = new ArrayList<String>(3);
-    files1.add("file_1");
-    files1.add("file_2");
-    files1.add("file_3");
+    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
+    files1.add(new Pair<Path, Path>(null, new Path("file_1")));
+    files1.add(new Pair<Path, Path>(null, new Path("file_2")));
+    files1.add(new Pair<Path, Path>(null, new Path("file_3")));
     assertNull(rqc.getReplicableHFiles(ID_ONE));
     assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
     rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
@@ -213,13 +215,16 @@ public abstract class TestReplicationStateBasic {
     rq1.addHFileRefs(ID_ONE, files1);
     assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
     assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
-    List<String> files2 = new ArrayList<>(files1);
-    String removedString = files2.remove(0);
-    rq1.removeHFileRefs(ID_ONE, files2);
+    List<String> hfiles2 = new ArrayList<>();
+    for (Pair<Path, Path> p : files1) {
+      hfiles2.add(p.getSecond().getName());
+    }
+    String removedString = hfiles2.remove(0);
+    rq1.removeHFileRefs(ID_ONE, hfiles2);
     assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size());
-    files2 = new ArrayList<>(1);
-    files2.add(removedString);
-    rq1.removeHFileRefs(ID_ONE, files2);
+    hfiles2 = new ArrayList<>(1);
+    hfiles2.add(removedString);
+    rq1.removeHFileRefs(ID_ONE, hfiles2);
     assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size());
     rp.unregisterPeer(ID_ONE);
   }
@@ -235,10 +240,10 @@ public abstract class TestReplicationStateBasic {
     rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
     rq1.addPeerToHFileRefs(ID_TWO);
 
-    List<String> files1 = new ArrayList<String>(3);
-    files1.add("file_1");
-    files1.add("file_2");
-    files1.add("file_3");
+    List<Pair<Path, Path>> files1 = new ArrayList<>(3);
+    files1.add(new Pair<Path, Path>(null, new Path("file_1")));
+    files1.add(new Pair<Path, Path>(null, new Path("file_2")));
+    files1.add(new Pair<Path, Path>(null, new Path("file_3")));
     rq1.addHFileRefs(ID_ONE, files1);
     rq1.addHFileRefs(ID_TWO, files1);
     assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size());


Mime
View raw message