hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject hbase git commit: HBASE-12091 Optionally ignore edits for dropped tables for replication.
Date Wed, 15 Nov 2017 00:33:29 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 c0639d271 -> f9833a780


HBASE-12091 Optionally ignore edits for dropped tables for replication.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f9833a78
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f9833a78
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f9833a78

Branch: refs/heads/branch-1
Commit: f9833a7802bd0d4291e8b6ec6baf2f85fe2e401f
Parents: c0639d2
Author: Lars Hofhansl <larsh@apache.org>
Authored: Tue Nov 14 16:33:12 2017 -0800
Committer: Lars Hofhansl <larsh@apache.org>
Committed: Tue Nov 14 16:33:12 2017 -0800

----------------------------------------------------------------------
 .../RetriesExhaustedWithDetailsException.java   |   9 +
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |  18 ++
 .../org/apache/hadoop/hbase/HConstants.java     |   5 +
 .../hbase/protobuf/ReplicationProtbufUtil.java  |   2 +-
 .../hbase/replication/ReplicationEndpoint.java  |   6 +
 .../HBaseInterClusterReplicationEndpoint.java   |  66 ++++-
 .../regionserver/ReplicationSink.java           |   8 +
 .../regionserver/ReplicationSourceManager.java  |   2 +-
 .../TestReplicationDroppedTables.java           | 292 +++++++++++++++++++
 9 files changed, 403 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
index 21ab156..e78f810 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.util.Bytes;
 
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.Collection;
@@ -51,6 +52,14 @@ extends RetriesExhaustedException {
   List<Row> actions;
   List<String> hostnameAndPort;
 
+  public RetriesExhaustedWithDetailsException(final String msg) {
+    super(msg);
+  }
+
+  public RetriesExhaustedWithDetailsException(final String msg, final IOException e) {
+    super(msg, e);
+  }
+
   public RetriesExhaustedWithDetailsException(List<Throwable> exceptions,
                                               List<Row> actions,
                                               List<String> hostnameAndPort) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 5945e5e..84d4a67 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -331,6 +331,24 @@ public final class ProtobufUtil {
   }
 
   /**
+   * Return the Exception thrown by the remote server wrapped in
+   * ServiceException as cause. RemoteException are left untouched.
+   *
+   * @param se ServiceException that wraps IO exception thrown by the server
+   * @return Exception wrapped in ServiceException.
+   */
+  public static IOException getServiceException(ServiceException e) {
+    Throwable t = e;
+    if (e instanceof ServiceException) {
+      t = e.getCause();
+    }
+    if (ExceptionUtil.isInterrupt(t)) {
+      return ExceptionUtil.asInterrupt(t);
+    }
+    return t instanceof IOException ? (IOException) t : new HBaseIOException(t);
+  }
+
+  /**
    * Like {@link #getRemoteException(ServiceException)} but more generic, able to handle
more than
    * just {@link ServiceException}. Prefer this method to
    * {@link #getRemoteException(ServiceException)} because trying to

http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 8df7bd8..c9f9ded 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1247,6 +1247,11 @@ public final class HConstants {
   public static final String REPLICATION_SOURCE_MAXTHREADS_KEY =
       "hbase.replication.source.maxthreads";
 
+  /** Drop edits for tables that been deleted from the replication source and target */
+  public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
+      "hbase.replication.drop.on.deleted.table";
+
+  /** Maximum number of threads used by the replication source for shipping edits to the
sinks */
   public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10;
 
   /** Config for pluggable consensus provider */

http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index d13a79c..6243511 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -71,7 +71,7 @@ public class ReplicationProtbufUtil {
     try {
       admin.replicateWALEntry(controller, p.getFirst());
     } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
+      throw ProtobufUtil.getServiceException(se);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index 69db31c..6f99377 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -51,6 +51,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
 
   @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
   class Context {
+    private final Configuration localConf;
     private final Configuration conf;
     private final FileSystem fs;
     private final TableDescriptors tableDescriptors;
@@ -62,6 +63,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
 
     @InterfaceAudience.Private
     public Context(
+        final Configuration localConf,
         final Configuration conf,
         final FileSystem fs,
         final String peerId,
@@ -70,6 +72,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
         final MetricsSource metrics,
         final TableDescriptors tableDescriptors,
         final Abortable abortable) {
+      this.localConf = localConf;
       this.conf = conf;
       this.fs = fs;
       this.clusterId = clusterId;
@@ -82,6 +85,9 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
     public Configuration getConfiguration() {
       return conf;
     }
+    public Configuration getLocalConfiguration() {
+      return localConf;
+    }
     public FileSystem getFilesystem() {
       return fs;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index cf85ffd..f1eb16d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -34,6 +34,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -43,10 +45,15 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
@@ -79,6 +86,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
 
   private HConnection conn;
+  private Configuration localConf;
   private Configuration conf;
   // How long should we sleep for each retry
   private long sleepForRetries;
@@ -102,11 +110,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   private Path hfileArchiveDir;
   private boolean replicationBulkLoadDataEnabled;
   private Abortable abortable;
+  private boolean dropOnDeletedTables;
 
   @Override
   public void init(Context context) throws IOException {
     super.init(context);
     this.conf = HBaseConfiguration.create(ctx.getConfiguration());
+    this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
     decorateConf();
     this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier",
300);
     this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
@@ -139,6 +149,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     // conservative for now.
     this.replicationRpcLimit = (int)(0.95 * (double)conf.getLong(RpcServer.MAX_REQUEST_SIZE,
       RpcServer.DEFAULT_MAX_REQUEST_SIZE));
+    this.dropOnDeletedTables =
+        this.conf.getBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
 
     this.replicationBulkLoadDataEnabled =
         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
@@ -225,6 +237,37 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     return entryLists;
   }
 
+  private TableName parseTable(String msg) {
+    // ... TableNotFoundException: '<table>'/n...
+    Pattern p = Pattern.compile("TableNotFoundException: \\'([\\S]*)\\'");
+    Matcher m = p.matcher(msg);
+    if (m.find()) {
+      String table = m.group(1);
+      try {
+        // double check that table is a valid table name
+        TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(table)));
+        return TableName.valueOf(table);
+      } catch (IllegalArgumentException ignore) {
+      }
+    }
+    return null;
+  }
+
+  // Filter a set of batches by TableName
+  private List<List<Entry>> filterBatches(final List<List<Entry>>
oldEntryList, TableName table) {
+    List<List<Entry>> entryLists = new ArrayList<>();
+    for (List<Entry> entries : oldEntryList) {
+      ArrayList<Entry> thisList = new ArrayList<Entry>(entries.size());
+      entryLists.add(thisList);
+      for (Entry e : entries) {
+        if (!e.getKey().getTablename().equals(table)) {
+          thisList.add(e);
+        }
+      }
+    }
+    return entryLists;
+  }
+
   private void reconnectToPeerCluster() {
     HConnection connection = null;
     try {
@@ -325,10 +368,27 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
           ioe = ((RemoteException) ioe).unwrapRemoteException();
           LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
           if (ioe instanceof TableNotFoundException) {
-            if (sleepForRetries("A table is missing in the peer cluster. "
-                + "Replication cannot proceed without losing data.", sleepMultiplier)) {
-              sleepMultiplier++;
+            if (dropOnDeletedTables) {
+              // this is a bit fragile, but cannot change how TNFE is serialized
+              // at least check whether the table name is legal
+              TableName table = parseTable(ioe.getMessage());
+              if (table != null) {
+                try (Connection localConn =
+                    ConnectionFactory.createConnection(ctx.getLocalConfiguration())) {
+                  if (!localConn.getAdmin().tableExists(table)) {
+                    // Would potentially be better to retry in one of the outer loops
+                    // and add a table filter there; but that would break the encapsulation,
+                    // so we're doing the filtering here.
+                    LOG.info("Missing table detected at sink, local table also does not exist,
filtering edits for '"+table+"'");
+                    batches = filterBatches(batches, table);
+                    continue;
+                  }
+                } catch (IOException iox) {
+                  LOG.warn("Exception checking for local table: ", iox);
+                }
+              }
             }
+            // fall through and sleep below
           } else {
             LOG.warn("Peer encountered RemoteException, rechecking all sinks: ", ioe);
             replicationSinkMgr.chooseSinks();

http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 8f262c5..769e347 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -42,12 +42,14 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
@@ -375,6 +377,12 @@ public class ReplicationSink {
       for (List<Row> rows : allRows) {
         table.batch(rows);
       }
+    } catch (RetriesExhaustedWithDetailsException rewde) {
+      for (Throwable ex : rewde.getCauses()) {
+        if (ex instanceof TableNotFoundException) {
+          throw new TableNotFoundException("'"+tableName+"'");
+        }
+      }
     } catch (InterruptedException ix) {
       throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index b50e840..77fd837 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -532,7 +532,7 @@ public class ReplicationSourceManager implements ReplicationListener {
       clusterId, replicationEndpoint, metrics);
 
     // init replication endpoint
-    replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
+    replicationEndpoint.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(),
       fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server));
 
     return src;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f9833a78/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
new file mode 100644
index 0000000..6c00047
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
@@ -0,0 +1,292 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Category(LargeTests.class)
+public class TestReplicationDroppedTables extends TestReplicationBase {
+  private static final Log LOG = LogFactory.getLog(TestReplicationDroppedTables.class);
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    // Starting and stopping replication can make us miss new logs,
+    // rolling like this makes sure the most recent one gets added to the queue
+    for ( JVMClusterUtil.RegionServerThread r :
+        utility1.getHBaseCluster().getRegionServerThreads()) {
+      utility1.getHBaseAdmin().rollWALWriter(r.getRegionServer().getServerName());
+    }
+    int rowCount = utility1.countRows(tableName);
+    utility1.deleteTableData(tableName);
+    // truncating the table will send one Delete per row to the slave cluster
+    // in an async fashion, which is why we cannot just call deleteTableData on
+    // utility2 since late writes could make it to the slave in some way.
+    // Instead, we truncate the first table and wait for all the Deletes to
+    // make it to the slave.
+    Scan scan = new Scan();
+    int lastCount = 0;
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for truncate");
+      }
+      ResultScanner scanner = htable2.getScanner(scan);
+      Result[] res = scanner.next(rowCount);
+      scanner.close();
+      if (res.length != 0) {
+        if (res.length < lastCount) {
+          i--; // Don't increment timeout if we make progress
+        }
+        lastCount = res.length;
+        LOG.info("Still got " + res.length + " rows");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+  }
+
+  @Test(timeout = 600000)
+  public void testEditsStuckBehindDroppedTable() throws Exception {
+    // Sanity check
+    // Make sure by default edits for dropped tables stall the replication queue, even when
the
+    // table(s) in question have been deleted on both ends.
+    testEditsBehindDroppedTable(false, "test_dropped");
+  }
+
+  @Test(timeout = 600000)
+  public void testEditsDroppedWithDroppedTable() throws Exception {
+    // Make sure by default edits for dropped tables are themselves dropped when the
+    // table(s) in question have been deleted on both ends.
+    testEditsBehindDroppedTable(true, "test_dropped");
+  }
+
+  @Test(timeout = 600000)
+  public void testEditsDroppedWithDroppedTableNS() throws Exception {
+    // also try with a namespace
+    Connection connection1 = ConnectionFactory.createConnection(conf1);
+    try (Admin admin1 = connection1.getAdmin()) {
+      admin1.createNamespace(NamespaceDescriptor.create("NS").build());
+    }
+    Connection connection2 = ConnectionFactory.createConnection(conf2);
+    try (Admin admin2 = connection2.getAdmin()) {
+      admin2.createNamespace(NamespaceDescriptor.create("NS").build());
+    }
+    testEditsBehindDroppedTable(true, "NS:test_dropped");
+  }
+
+  private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws
Exception {
+    conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding);
+    conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
+
+    // make sure we have a single region server only, so that all
+    // edits for all tables go there
+    utility1.shutdownMiniHBaseCluster();
+    utility1.startMiniHBaseCluster(1, 1);
+
+    TableName tablename = TableName.valueOf(tName);
+    byte[] familyname = Bytes.toBytes("fam");
+    byte[] row = Bytes.toBytes("row");
+
+    HTableDescriptor table = new HTableDescriptor(tablename);
+    HColumnDescriptor fam = new HColumnDescriptor(familyname);
+    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    table.addFamily(fam);
+
+    Connection connection1 = ConnectionFactory.createConnection(conf1);
+    Connection connection2 = ConnectionFactory.createConnection(conf2);
+    try (Admin admin1 = connection1.getAdmin()) {
+      admin1.createTable(table);
+    }
+    try (Admin admin2 = connection2.getAdmin()) {
+      admin2.createTable(table);
+    }
+    utility1.waitUntilAllRegionsAssigned(tablename);
+    utility2.waitUntilAllRegionsAssigned(tablename);
+
+    Table lHtable1 = utility1.getConnection().getTable(tablename);
+
+    // now suspend replication
+    admin.disablePeer(PEER_ID);
+
+    // put some data (lead with 0 so the edit gets sorted before the other table's edits
+    //   in the replication batch)
+    // write a bunch of edits, making sure we fill a batch
+    byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped");
+    Put put = new Put(rowkey);
+    put.addColumn(familyname, row, row);
+    lHtable1.put(put);
+
+    rowkey = Bytes.toBytes("normal put");
+    put = new Put(rowkey);
+    put.addColumn(famName, row, row);
+    htable1.put(put);
+
+    try (Admin admin1 = connection1.getAdmin()) {
+      admin1.disableTable(tablename);
+      admin1.deleteTable(tablename);
+    }
+    try (Admin admin2 = connection2.getAdmin()) {
+      admin2.disableTable(tablename);
+      admin2.deleteTable(tablename);
+    }
+
+    admin.enablePeer(PEER_ID);
+    if (allowProceeding) {
+      // in this we'd expect the key to make it over
+      verifyReplicationProceeded(rowkey);
+    } else {
+      verifyReplicationStuck(rowkey);
+    }
+    // just to be safe
+    conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
+  }
+
+  @Test(timeout = 600000)
+  public void testEditsBehindDroppedTableTiming() throws Exception {
+    conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
+    conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
+
+    // make sure we have a single region server only, so that all
+    // edits for all tables go there
+    utility1.shutdownMiniHBaseCluster();
+    utility1.startMiniHBaseCluster(1, 1);
+
+    TableName tablename = TableName.valueOf("testdroppedtimed");
+    byte[] familyname = Bytes.toBytes("fam");
+    byte[] row = Bytes.toBytes("row");
+
+    HTableDescriptor table = new HTableDescriptor(tablename);
+    HColumnDescriptor fam = new HColumnDescriptor(familyname);
+    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    table.addFamily(fam);
+
+    Connection connection1 = ConnectionFactory.createConnection(conf1);
+    Connection connection2 = ConnectionFactory.createConnection(conf2);
+    try (Admin admin1 = connection1.getAdmin()) {
+      admin1.createTable(table);
+    }
+    try (Admin admin2 = connection2.getAdmin()) {
+      admin2.createTable(table);
+    }
+    utility1.waitUntilAllRegionsAssigned(tablename);
+    utility2.waitUntilAllRegionsAssigned(tablename);
+
+    Table lHtable1 = utility1.getConnection().getTable(tablename);
+
+    // now suspend replication
+    admin.disablePeer(PEER_ID);
+
+    // put some data (lead with 0 so the edit gets sorted before the other table's edits
+    //   in the replication batch)
+    // write a bunch of edits, making sure we fill a batch
+    byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped");
+    Put put = new Put(rowkey);
+    put.addColumn(familyname, row, row);
+    lHtable1.put(put);
+
+    rowkey = Bytes.toBytes("normal put");
+    put = new Put(rowkey);
+    put.addColumn(famName, row, row);
+    htable1.put(put);
+
+    try (Admin admin2 = connection2.getAdmin()) {
+      admin2.disableTable(tablename);
+      admin2.deleteTable(tablename);
+    }
+
+    admin.enablePeer(PEER_ID);
+    // edit should still be stuck
+
+    try (Admin admin1 = connection1.getAdmin()) {
+      // the source table still exists, replication should be stalled
+      verifyReplicationStuck(rowkey);
+
+      admin1.disableTable(tablename);
+      // still stuck, source table still exists
+      verifyReplicationStuck(rowkey);
+
+      admin1.deleteTable(tablename);
+      // now the source table is gone, replication should proceed, the
+      // offending edits be dropped
+      verifyReplicationProceeded(rowkey);
+    }
+    // just to be safe
+    conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
+  }
+
+  private void verifyReplicationProceeded(byte[] rowkey) throws Exception {
+    Get get = new Get(rowkey);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = htable2.get(get);
+      if (res.size() == 0) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(res.getRow(), rowkey);
+        break;
+      }
+    }
+  }
+
+  private void verifyReplicationStuck(byte[] rowkey) throws Exception {
+    Get get = new Get(rowkey);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        fail("Edit should have been stuck behind dropped tables");
+      } else {
+        LOG.info("Row not replicated, let's wait a bit more...");
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+  }
+}


Mime
View raw message