Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5B875200B4F for ; Mon, 11 Jul 2016 23:49:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5A09B160A78; Mon, 11 Jul 2016 21:49:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 70725160A87 for ; Mon, 11 Jul 2016 23:49:48 +0200 (CEST) Received: (qmail 65119 invoked by uid 500); 11 Jul 2016 21:49:47 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 64896 invoked by uid 99); 11 Jul 2016 21:49:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Jul 2016 21:49:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 40DC7E5720; Mon, 11 Jul 2016 21:49:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Mon, 11 Jul 2016 21:49:51 -0000 Message-Id: <40bf4312200e44ea959eb4986387264b@git.apache.org> In-Reply-To: <14fd9ee1e48544dda68b169f7766cffb@git.apache.org> References: <14fd9ee1e48544dda68b169f7766cffb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/6] accumulo git commit: Merge branch '1.7' into 1.8 archived-at: Mon, 11 Jul 2016 21:49:50 -0000 Merge branch '1.7' into 1.8 Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/24deb392 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/24deb392 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/24deb392 Branch: refs/heads/1.8 Commit: 24deb39232a463149498bff4df6755ffdfd4b71d Parents: 4c4ad86 8949e3d Author: Josh Elser Authored: Mon Jul 11 17:49:24 2016 -0400 Committer: Josh Elser Committed: Mon Jul 11 17:49:24 2016 -0400 ---------------------------------------------------------------------- .../test/replication/ReplicationIT.java | 75 ++++++-------------- 1 file changed, 22 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/24deb392/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java index 467a69b,0000000..11f0634 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationIT.java @@@ -1,1435 -1,0 +1,1404 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.IteratorSetting.Column; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.conf.ColumnSet; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.replication.ReplicationTableOfflineException; +import org.apache.accumulo.core.replication.ReplicationTarget; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; +import org.apache.accumulo.fate.zookeeper.ZooLock; +import org.apache.accumulo.gc.SimpleGarbageCollector; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.log.WalStateManager; +import org.apache.accumulo.server.log.WalStateManager.WalState; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.replication.ReplicaSystemFactory; +import org.apache.accumulo.server.replication.StatusCombiner; +import org.apache.accumulo.server.replication.StatusFormatter; +import org.apache.accumulo.server.replication.StatusUtil; +import org.apache.accumulo.server.replication.proto.Replication.Status; +import org.apache.accumulo.server.util.ReplicationTableUtil; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import com.google.protobuf.TextFormat; + +/** + * Replication tests which verify expected functionality using a single MAC instance. A MockReplicaSystem is used to "fake" the peer instance that we're + * replicating to. This lets us test replication in a functional way without having to worry about two real systems. + */ +public class ReplicationIT extends ConfigurableMacBase { + private static final Logger log = LoggerFactory.getLogger(ReplicationIT.class); ++ private static final long MILLIS_BETWEEN_REPLICATION_TABLE_ONLINE_CHECKS = 5000l; + + @Override + public int defaultTimeoutSeconds() { + return 60 * 10; + } + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + // Run the master replication loop run frequently + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); + cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M"); + cfg.setProperty(Property.GC_CYCLE_START, "1s"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "0"); + cfg.setProperty(Property.REPLICATION_NAME, "master"); + cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "1s"); + cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_PERIOD, "1s"); + cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M"); + cfg.setNumTservers(1); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + private Multimap getLogs(Connector conn) throws Exception { + // Map of server to tableId + Multimap serverToTableID = HashMultimap.create(); + Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.setRange(MetadataSchema.TabletsSection.getRange()); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME); + for (Entry entry : scanner) { + TServerInstance key = new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier()); + byte[] tableId = KeyExtent.tableOfMetadataRow(entry.getKey().getRow()); + serverToTableID.put(key, new String(tableId, UTF_8)); + } + // Map of logs to tableId + Multimap logs = HashMultimap.create(); + Instance i = conn.getInstance(); + ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(), ""); + WalStateManager wals = new WalStateManager(conn.getInstance(), zk); + for (Entry> entry : wals.getAllMarkers().entrySet()) { + for (UUID id : entry.getValue()) { + Pair state = wals.state(entry.getKey(), id); + for (String tableId : serverToTableID.get(entry.getKey())) { + logs.put(state.getSecond().toString(), tableId); + } + } + } + return logs; + } + + private Multimap getAllLogs(Connector conn) throws Exception { + Multimap logs = getLogs(conn); + try { + Scanner scanner = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY); + StatusSection.limit(scanner); + Text buff = new Text(); + for (Entry entry : scanner) { + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + return logs; + } + + StatusSection.getFile(entry.getKey(), buff); + String file = buff.toString(); + String tableId = StatusSection.getTableId(entry.getKey()); + + logs.put(file, tableId); + } + } catch (TableOfflineException e) { + log.debug("Replication table isn't online yet"); + } + return logs; + } + + private void waitForGCLock(Connector conn) throws InterruptedException { + // Check if the GC process has the lock before wasting our retry attempts + ZooKeeperInstance zki = (ZooKeeperInstance) conn.getInstance(); + ZooCacheFactory zcf = new ZooCacheFactory(); + ZooCache zcache = zcf.getZooCache(zki.getZooKeepers(), zki.getZooKeepersSessionTimeOut()); + String zkPath = ZooUtil.getRoot(conn.getInstance()) + Constants.ZGC_LOCK; + log.info("Looking for GC lock at {}", zkPath); + byte[] data = ZooLock.getLockData(zcache, zkPath, null); + while (null == data) { + log.info("Waiting for GC ZooKeeper lock to be acquired"); + Thread.sleep(1000); + data = ZooLock.getLockData(zcache, zkPath, null); + } + } + + @Test + public void replicationTableCreated() throws AccumuloException, AccumuloSecurityException { + Assert.assertTrue(getConnector().tableOperations().exists(ReplicationTable.NAME)); + Assert.assertEquals(ReplicationTable.ID, getConnector().tableOperations().tableIdMap().get(ReplicationTable.NAME)); + } + + @Test + public void verifyReplicationTableConfig() throws AccumuloException, TableNotFoundException, AccumuloSecurityException { + TableOperations tops = getConnector().tableOperations(); + Map> iterators = tops.listIterators(ReplicationTable.NAME); + + // verify combiners are only iterators (no versioning) + Assert.assertEquals(1, iterators.size()); + + // look for combiner + Assert.assertTrue(iterators.containsKey(ReplicationTable.COMBINER_NAME)); + Assert.assertTrue(iterators.get(ReplicationTable.COMBINER_NAME).containsAll(EnumSet.allOf(IteratorScope.class))); + for (IteratorScope scope : EnumSet.allOf(IteratorScope.class)) { + IteratorSetting is = tops.getIteratorSetting(ReplicationTable.NAME, ReplicationTable.COMBINER_NAME, scope); + Assert.assertEquals(30, is.getPriority()); + Assert.assertEquals(StatusCombiner.class.getName(), is.getIteratorClass()); + Assert.assertEquals(1, is.getOptions().size()); + Assert.assertTrue(is.getOptions().containsKey("columns")); + String cols = is.getOptions().get("columns"); + Column statusSectionCol = new Column(StatusSection.NAME); + Column workSectionCol = new Column(WorkSection.NAME); + Assert.assertEquals( + ColumnSet.encodeColumns(statusSectionCol.getColumnFamily(), statusSectionCol.getColumnQualifier()) + "," + + ColumnSet.encodeColumns(workSectionCol.getColumnFamily(), workSectionCol.getColumnQualifier()), cols); + } + + boolean foundLocalityGroups = false; + boolean foundLocalityGroupDef1 = false; + boolean foundLocalityGroupDef2 = false; + boolean foundFormatter = false; + Joiner j = Joiner.on(","); + Function textToString = new Function() { + @Override + public String apply(Text text) { + return text.toString(); + } + }; + for (Entry p : tops.getProperties(ReplicationTable.NAME)) { + String key = p.getKey(); + String val = p.getValue(); + // STATUS_LG_NAME, STATUS_LG_COLFAMS, WORK_LG_NAME, WORK_LG_COLFAMS + if (key.equals(Property.TABLE_FORMATTER_CLASS.getKey()) && val.equals(StatusFormatter.class.getName())) { + // look for formatter + foundFormatter = true; + } else if (key.equals(Property.TABLE_LOCALITY_GROUPS.getKey()) && val.equals(j.join(ReplicationTable.LOCALITY_GROUPS.keySet()))) { + // look for locality groups enabled + foundLocalityGroups = true; + } else if (key.startsWith(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey())) { + // look for locality group column family definitions + if (key.equals(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + ReplicationTable.STATUS_LG_NAME) + && val.equals(j.join(Iterables.transform(ReplicationTable.STATUS_LG_COLFAMS, textToString)))) { + foundLocalityGroupDef1 = true; + } else if (key.equals(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + ReplicationTable.WORK_LG_NAME) + && val.equals(j.join(Iterables.transform(ReplicationTable.WORK_LG_COLFAMS, textToString)))) { + foundLocalityGroupDef2 = true; + } + } + } + Assert.assertTrue(foundLocalityGroups); + Assert.assertTrue(foundLocalityGroupDef1); + Assert.assertTrue(foundLocalityGroupDef2); + Assert.assertTrue(foundFormatter); + } + + @Test + public void correctRecordsCompleteFile() throws Exception { + Connector conn = getConnector(); + String table = "table1"; + conn.tableOperations().create(table); + // If we have more than one tserver, this is subject to a race condition. + conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true"); + + BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); + for (int i = 0; i < 10; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put(new byte[0], new byte[0], new byte[0]); + bw.addMutation(m); + } + + bw.close(); + + // After writing data, we'll get a replication table online - boolean online = ReplicationTable.isOnline(conn); - int attempts = 10; - do { - if (!online) { - sleepUninterruptibly(2, TimeUnit.SECONDS); - online = ReplicationTable.isOnline(conn); - attempts--; - } - } while (!online && attempts > 0); - Assert.assertTrue("Replication table was not online", online); ++ while (!ReplicationTable.isOnline(conn)) { ++ sleepUninterruptibly(MILLIS_BETWEEN_REPLICATION_TABLE_ONLINE_CHECKS, TimeUnit.MILLISECONDS); ++ } ++ Assert.assertTrue("Replication table did not exist", ReplicationTable.isOnline(conn)); + + for (int i = 0; i < 5; i++) { + if (conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ)) { + break; + } + log.info("Could not read replication table, waiting and will retry"); + Thread.sleep(2000); + } + + Assert.assertTrue("'root' user could not read the replication table", + conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ)); + + Set replRows = new HashSet<>(); + Scanner scanner; - attempts = 5; ++ int attempts = 5; + while (replRows.isEmpty() && attempts > 0) { + scanner = ReplicationTable.getScanner(conn); + StatusSection.limit(scanner); + for (Entry entry : scanner) { + Key k = entry.getKey(); + + String fileUri = k.getRow().toString(); + try { + new URI(fileUri); + } catch (URISyntaxException e) { + Assert.fail("Expected a valid URI: " + fileUri); + } + + replRows.add(fileUri); + } + } + + Set wals = new HashSet<>(); + attempts = 5; + Instance i = conn.getInstance(); + ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(), ""); + while (wals.isEmpty() && attempts > 0) { + WalStateManager markers = new WalStateManager(i, zk); + for (Entry entry : markers.getAllState().entrySet()) { + wals.add(entry.getKey().toString()); + } + attempts--; + } + + // We only have one file that should need replication (no trace table) + // We should find an entry in tablet and in the repl row + Assert.assertEquals("Rows found: " + replRows, 1, replRows.size()); + + // There should only be one extra WALog that replication doesn't know about + replRows.removeAll(wals); + Assert.assertEquals(2, wals.size()); + Assert.assertEquals(0, replRows.size()); + } + + @Test + public void noRecordsWithoutReplication() throws Exception { + Connector conn = getConnector(); + List tables = new ArrayList<>(); + + // replication shouldn't be online when we begin + Assert.assertFalse(ReplicationTable.isOnline(conn)); + + for (int i = 0; i < 5; i++) { + String name = "table" + i; + tables.add(name); + conn.tableOperations().create(name); + } + + // nor after we create some tables (that aren't being replicated) + Assert.assertFalse(ReplicationTable.isOnline(conn)); + + for (String table : tables) { + writeSomeData(conn, table, 5, 5); + } + + // After writing data, still no replication table + Assert.assertFalse(ReplicationTable.isOnline(conn)); + + for (String table : tables) { + conn.tableOperations().compact(table, null, null, true, true); + } + + // After compacting data, still no replication table + Assert.assertFalse(ReplicationTable.isOnline(conn)); + + for (String table : tables) { + conn.tableOperations().delete(table); + } + + // After deleting tables, still no replication table + Assert.assertFalse(ReplicationTable.isOnline(conn)); + } + + @Test + public void twoEntriesForTwoTables() throws Exception { + Connector conn = getConnector(); + String table1 = "table1", table2 = "table2"; + + // replication shouldn't exist when we begin + Assert.assertFalse("Replication table already online at the beginning of the test", ReplicationTable.isOnline(conn)); + + // Create two tables + conn.tableOperations().create(table1); + conn.tableOperations().create(table2); + conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.READ); + // wait for permission to propagate + Thread.sleep(5000); + + // Enable replication on table1 + conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); + + // Despite having replication on, we shouldn't have any need to write a record to it (and bring it online) + Assert.assertFalse(ReplicationTable.isOnline(conn)); + + // Write some data to table1 + writeSomeData(conn, table1, 50, 50); + + // After the commit for these mutations finishes, we'll get a replication entry in accumulo.metadata for table1 + // Don't want to compact table1 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there + + // After writing data, we'll get a replication table online - boolean online = ReplicationTable.isOnline(conn); - int attempts = 10; - do { - if (!online) { - sleepUninterruptibly(5, TimeUnit.SECONDS); - online = ReplicationTable.isOnline(conn); - attempts--; - } - } while (!online && attempts > 0); - Assert.assertTrue("Replication table did not exist", online); - ++ while (!ReplicationTable.isOnline(conn)) { ++ sleepUninterruptibly(MILLIS_BETWEEN_REPLICATION_TABLE_ONLINE_CHECKS, TimeUnit.MILLISECONDS); ++ } + Assert.assertTrue(ReplicationTable.isOnline(conn)); + + // Verify that we found a single replication record that's for table1 + Scanner s = ReplicationTable.getScanner(conn); + StatusSection.limit(s); + for (int i = 0; i < 5; i++) { + if (Iterators.size(s.iterator()) == 1) { + break; + } + Thread.sleep(1000); + } + Entry entry = Iterators.getOnlyElement(s.iterator()); + // We should at least find one status record for this table, we might find a second if another log was started from ingesting the data + Assert.assertEquals("Expected to find replication entry for " + table1, conn.tableOperations().tableIdMap().get(table1), entry.getKey() + .getColumnQualifier().toString()); + s.close(); + + // Enable replication on table2 + conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true"); + + // Write some data to table2 + writeSomeData(conn, table2, 50, 50); + + // After the commit on these mutations, we'll get a replication entry in accumulo.metadata for table2 + // Don't want to compact table2 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there + + Set tableIds = Sets.newHashSet(conn.tableOperations().tableIdMap().get(table1), conn.tableOperations().tableIdMap().get(table2)); + Set tableIdsForMetadata = Sets.newHashSet(tableIds); + + List> records = new ArrayList<>(); + s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.setRange(MetadataSchema.ReplicationSection.getRange()); + for (Entry metadata : s) { + records.add(metadata); + log.debug("Meta: {} => {}", metadata.getKey().toStringNoTruncate(), metadata.getValue().toString()); + } + + Assert.assertEquals("Expected to find 2 records, but actually found " + records, 2, records.size()); + + for (Entry metadata : records) { + Assert.assertTrue("Expected record to be in metadata but wasn't " + metadata.getKey().toStringNoTruncate() + ", tableIds remaining " + + tableIdsForMetadata, tableIdsForMetadata.remove(metadata.getKey().getColumnQualifier().toString())); + } + + Assert.assertTrue("Expected that we had removed all metadata entries " + tableIdsForMetadata, tableIdsForMetadata.isEmpty()); + + // Should be creating these records in replication table from metadata table every second + Thread.sleep(5000); + + // Verify that we found two replication records: one for table1 and one for table2 + s = ReplicationTable.getScanner(conn); + StatusSection.limit(s); + Iterator> iter = s.iterator(); + Assert.assertTrue("Found no records in replication table", iter.hasNext()); + entry = iter.next(); + Assert.assertTrue("Expected to find element in replication table", tableIds.remove(entry.getKey().getColumnQualifier().toString())); + Assert.assertTrue("Expected to find two elements in replication table, only found one ", iter.hasNext()); + entry = iter.next(); + Assert.assertTrue("Expected to find element in replication table", tableIds.remove(entry.getKey().getColumnQualifier().toString())); + Assert.assertFalse("Expected to only find two elements in replication table", iter.hasNext()); + } + + private void writeSomeData(Connector conn, String table, int rows, int cols) throws Exception { + BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); + for (int row = 0; row < rows; row++) { + Mutation m = new Mutation(Integer.toString(row)); + for (int col = 0; col < cols; col++) { + String value = Integer.toString(col); + m.put(value, "", value); + } + bw.addMutation(m); + } + bw.close(); + } + + @Test + public void replicationEntriesPrecludeWalDeletion() throws Exception { + final Connector conn = getConnector(); + String table1 = "table1", table2 = "table2", table3 = "table3"; + final Multimap logs = HashMultimap.create(); + final AtomicBoolean keepRunning = new AtomicBoolean(true); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + // Should really be able to interrupt here, but the Scanner throws a fit to the logger + // when that happens + while (keepRunning.get()) { + try { + logs.putAll(getAllLogs(conn)); + } catch (Exception e) { + log.error("Error getting logs", e); + } + } + } + + }); + + t.start(); + + conn.tableOperations().create(table1); + conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); + conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); + Thread.sleep(2000); + + // Write some data to table1 + writeSomeData(conn, table1, 200, 500); + + conn.tableOperations().create(table2); + conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true"); + conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); + Thread.sleep(2000); + + writeSomeData(conn, table2, 200, 500); + + conn.tableOperations().create(table3); + conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true"); + conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); + Thread.sleep(2000); + + writeSomeData(conn, table3, 200, 500); + + // Force a write to metadata for the data written + for (String table : Arrays.asList(table1, table2, table3)) { + conn.tableOperations().flush(table, null, null, true); + } + + keepRunning.set(false); + t.join(5000); + + // The master is only running every second to create records in the replication table from the metadata table + // Sleep a sufficient amount of time to ensure that we get the straggling WALs that might have been created at the end + Thread.sleep(5000); + + Set replFiles = getReferencesToFilesToBeReplicated(conn); + + // We might have a WAL that was use solely for the replication table + // We want to remove that from our list as it should not appear in the replication table + String replicationTableId = conn.tableOperations().tableIdMap().get(ReplicationTable.NAME); + Iterator> observedLogs = logs.entries().iterator(); + while (observedLogs.hasNext()) { + Entry observedLog = observedLogs.next(); + if (replicationTableId.equals(observedLog.getValue())) { + log.info("Removing {} because its tableId is for the replication table", observedLog); + observedLogs.remove(); + } + } + + // We should have *some* reference to each log that was seen in the metadata table + // They might not yet all be closed though (might be newfile) + Assert.assertTrue("Metadata log distribution: " + logs + "replFiles " + replFiles, logs.keySet().containsAll(replFiles)); + Assert.assertTrue("Difference between replication entries and current logs is bigger than one", logs.keySet().size() - replFiles.size() <= 1); + + final Configuration conf = new Configuration(); + for (String replFile : replFiles) { + Path p = new Path(replFile); + FileSystem fs = p.getFileSystem(conf); + if (!fs.exists(p)) { + // double-check: the garbage collector can be fast + Set currentSet = getReferencesToFilesToBeReplicated(conn); + log.info("Current references {}", currentSet); + log.info("Looking for reference to {}", replFile); + log.info("Contains? {}", currentSet.contains(replFile)); + Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, !currentSet.contains(replFile)); + } + } + } + + private Set getReferencesToFilesToBeReplicated(final Connector conn) throws ReplicationTableOfflineException { + Scanner s = ReplicationTable.getScanner(conn); + StatusSection.limit(s); + Set replFiles = new HashSet<>(); + for (Entry entry : s) { + replFiles.add(entry.getKey().getRow().toString()); + } + return replFiles; + } + + @Test + public void combinerWorksOnMetadata() throws Exception { + Connector conn = getConnector(); + + conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE); + + ReplicationTableUtil.configureMetadataTable(conn, MetadataTable.NAME); + + Status stat1 = StatusUtil.fileCreated(100); + Status stat2 = StatusUtil.fileClosed(); + + BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid"); + m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat1)); + bw.addMutation(m); + bw.close(); + + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.setRange(ReplicationSection.getRange()); + + Status actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get()); + Assert.assertEquals(stat1, actual); + + bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid"); + m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat2)); + bw.addMutation(m); + bw.close(); + + s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.setRange(ReplicationSection.getRange()); + + actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get()); + Status expected = Status.newBuilder().setBegin(0).setEnd(0).setClosed(true).setInfiniteEnd(true).setCreatedTime(100).build(); + + Assert.assertEquals(expected, actual); + } + + @Test + public void noDeadlock() throws Exception { + final Connector conn = getConnector(); + + ReplicationTable.setOnline(conn); + conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE); + conn.tableOperations().deleteRows(ReplicationTable.NAME, null, null); + + String table1 = "table1", table2 = "table2", table3 = "table3"; + conn.tableOperations().create(table1); + conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); + conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); + conn.tableOperations().create(table2); + conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true"); + conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); + conn.tableOperations().create(table3); + conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true"); + conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); + + writeSomeData(conn, table1, 200, 500); + + writeSomeData(conn, table2, 200, 500); + + writeSomeData(conn, table3, 200, 500); + + // Flush everything to try to make the replication records + for (String table : Arrays.asList(table1, table2, table3)) { + conn.tableOperations().flush(table, null, null, true); + } + + // Flush everything to try to make the replication records + for (String table : Arrays.asList(table1, table2, table3)) { + conn.tableOperations().flush(table, null, null, true); + } + + for (String table : Arrays.asList(MetadataTable.NAME, table1, table2, table3)) { + Iterators.size(conn.createScanner(table, Authorizations.EMPTY).iterator()); + } + } + + @Test + public void filesClosedAfterUnused() throws Exception { + Connector conn = getConnector(); + + String table = "table"; + conn.tableOperations().create(table); + String tableId = conn.tableOperations().tableIdMap().get(table); + + Assert.assertNotNull(tableId); + + conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true"); + conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); + // just sleep + conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1", + ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "50000")); + + // Write a mutation to make a log file + BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); + Mutation m = new Mutation("one"); + m.put("", "", ""); + bw.addMutation(m); + bw.close(); + + // Write another to make sure the logger rolls itself? + bw = conn.createBatchWriter(table, new BatchWriterConfig()); + m = new Mutation("three"); + m.put("", "", ""); + bw.addMutation(m); + bw.close(); + + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME); + s.setRange(TabletsSection.getRange(tableId)); + Set wals = new HashSet<>(); + for (Entry entry : s) { + LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue()); + wals.add(new Path(logEntry.filename).toString()); + } + + log.warn("Found wals {}", wals); + + bw = conn.createBatchWriter(table, new BatchWriterConfig()); + m = new Mutation("three"); + byte[] bytes = new byte[1024 * 1024]; + m.put("1".getBytes(), new byte[0], bytes); + m.put("2".getBytes(), new byte[0], bytes); + m.put("3".getBytes(), new byte[0], bytes); + m.put("4".getBytes(), new byte[0], bytes); + m.put("5".getBytes(), new byte[0], bytes); + bw.addMutation(m); + bw.close(); + + conn.tableOperations().flush(table, null, null, true); + + while (!ReplicationTable.isOnline(conn)) { - sleepUninterruptibly(2, TimeUnit.SECONDS); ++ sleepUninterruptibly(MILLIS_BETWEEN_REPLICATION_TABLE_ONLINE_CHECKS, TimeUnit.MILLISECONDS); + } + + for (int i = 0; i < 10; i++) { + s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.fetchColumnFamily(LogColumnFamily.NAME); + s.setRange(TabletsSection.getRange(tableId)); + for (Entry entry : s) { + log.info(entry.getKey().toStringNoTruncate() + "=" + entry.getValue()); + } + + try { + s = ReplicationTable.getScanner(conn); + StatusSection.limit(s); + Text buff = new Text(); + boolean allReferencedLogsClosed = true; + int recordsFound = 0; + for (Entry e : s) { + recordsFound++; + allReferencedLogsClosed = true; + StatusSection.getFile(e.getKey(), buff); + String file = buff.toString(); + if (wals.contains(file)) { + Status stat = Status.parseFrom(e.getValue().get()); + if (!stat.getClosed()) { + log.info("{} wasn't closed", file); + allReferencedLogsClosed = false; + } + } + } + + if (recordsFound > 0 && allReferencedLogsClosed) { + return; + } + Thread.sleep(2000); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof AccumuloSecurityException) { + AccumuloSecurityException ase = (AccumuloSecurityException) cause; + switch (ase.getSecurityErrorCode()) { + case PERMISSION_DENIED: + // We tried to read the replication table before the GRANT went through + Thread.sleep(2000); + break; + default: + throw e; + } + } + } + } + + Assert.fail("We had a file that was referenced but didn't get closed"); + } + + @Test + public void singleTableWithSingleTarget() throws Exception { + // We want to kill the GC so it doesn't come along and close Status records and mess up the comparisons + // against expected Status messages. + getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); + + Connector conn = getConnector(); + String table1 = "table1"; + + // replication shouldn't be online when we begin + Assert.assertFalse(ReplicationTable.isOnline(conn)); + + // Create a table + conn.tableOperations().create(table1); + + int attempts = 10; + + // Might think the table doesn't yet exist, retry + while (attempts > 0) { + try { + // Enable replication on table1 + conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); + // Replicate table1 to cluster1 in the table with id of '4' + conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "4"); + // Sleep for 100 seconds before saying something is replicated + conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1", + ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "100000")); + break; + } catch (Exception e) { + attempts--; + if (attempts <= 0) { + throw e; + } + sleepUninterruptibly(2, TimeUnit.SECONDS); + } + } + + // Write some data to table1 + writeSomeData(conn, table1, 2000, 50); + + // Make sure the replication table is online at this point - boolean online = ReplicationTable.isOnline(conn); - attempts = 10; - do { - if (!online) { - sleepUninterruptibly(2, TimeUnit.SECONDS); - online = ReplicationTable.isOnline(conn); - attempts--; - } - } while (!online && attempts > 0); - Assert.assertTrue("Replication table was never created", online); ++ while (!ReplicationTable.isOnline(conn)) { ++ sleepUninterruptibly(MILLIS_BETWEEN_REPLICATION_TABLE_ONLINE_CHECKS, TimeUnit.MILLISECONDS); ++ } ++ Assert.assertTrue("Replication table was never created", ReplicationTable.isOnline(conn)); + + // ACCUMULO-2743 The Observer in the tserver has to be made aware of the change to get the combiner (made by the master) + for (int i = 0; i < 10 && !conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME); i++) { + sleepUninterruptibly(2, TimeUnit.SECONDS); + } + + Assert.assertTrue("Combiner was never set on replication table", + conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME)); + + // Trigger the minor compaction, waiting for it to finish. + // This should write the entry to metadata that the file has data + conn.tableOperations().flush(table1, null, null, true); + + // Make sure that we have one status element, should be a new file + Scanner s = ReplicationTable.getScanner(conn); + StatusSection.limit(s); + Entry entry = null; + Status expectedStatus = StatusUtil.openWithUnknownLength(); + attempts = 10; + // This record will move from new to new with infinite length because of the minc (flush) + while (null == entry && attempts > 0) { + try { + entry = Iterables.getOnlyElement(s); + Status actual = Status.parseFrom(entry.getValue().get()); + if (actual.getInfiniteEnd() != expectedStatus.getInfiniteEnd()) { + entry = null; + // the master process didn't yet fire and write the new mutation, wait for it to do + // so and try to read it again + Thread.sleep(1000); + } + } catch (NoSuchElementException e) { + entry = null; + Thread.sleep(500); + } catch (IllegalArgumentException e) { + // saw this contain 2 elements once + s = ReplicationTable.getScanner(conn); + StatusSection.limit(s); + for (Entry content : s) { + log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue()); + } + throw e; + } finally { + attempts--; + } + } + + Assert.assertNotNull("Could not find expected entry in replication table", entry); + Status actual = Status.parseFrom(entry.getValue().get()); + Assert.assertTrue("Expected to find a replication entry that is open with infinite length: " + ProtobufUtil.toString(actual), + !actual.getClosed() && actual.getInfiniteEnd()); + + // Try a couple of times to watch for the work record to be created + boolean notFound = true; + for (int i = 0; i < 10 && notFound; i++) { + s = ReplicationTable.getScanner(conn); + WorkSection.limit(s); + int elementsFound = Iterables.size(s); + if (0 < elementsFound) { + Assert.assertEquals(1, elementsFound); + notFound = false; + } + Thread.sleep(500); + } + + // If we didn't find the work record, print the contents of the table + if (notFound) { + s = ReplicationTable.getScanner(conn); + for (Entry content : s) { + log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue()); + } + Assert.assertFalse("Did not find the work entry for the status entry", notFound); + } + + // Write some more data so that we over-run the single WAL + writeSomeData(conn, table1, 3000, 50); + + log.info("Issued compaction for table"); + conn.tableOperations().compact(table1, null, null, true, true); + log.info("Compaction completed"); + + // Master is creating entries in the replication table from the metadata table every second. + // Compaction should trigger the record to be written to metadata. Wait a bit to ensure + // that the master has time to work. + Thread.sleep(5000); + + s = ReplicationTable.getScanner(conn); + StatusSection.limit(s); + int numRecords = 0; + for (Entry e : s) { + numRecords++; + log.info("Found status record {}\t{}", e.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(e.getValue().get()))); + } + + Assert.assertEquals(2, numRecords); + + // We should eventually get 2 work records recorded, need to account for a potential delay though + // might see: status1 -> work1 -> status2 -> (our scans) -> work2 + notFound = true; + for (int i = 0; i < 10 && notFound; i++) { + s = ReplicationTable.getScanner(conn); + WorkSection.limit(s); + int elementsFound = Iterables.size(s); + if (2 == elementsFound) { + notFound = false; + } + Thread.sleep(500); + } + + // If we didn't find the work record, print the contents of the table + if (notFound) { + s = ReplicationTable.getScanner(conn); + for (Entry content : s) { + log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue()); + } + Assert.assertFalse("Did not find the work entries for the status entries", notFound); + } + } + + @Test + public void correctClusterNameInWorkEntry() throws Exception { + Connector conn = getConnector(); + String table1 = "table1"; + + // replication shouldn't be online when we begin + Assert.assertFalse(ReplicationTable.isOnline(conn)); + + // Create two tables + conn.tableOperations().create(table1); + + int attempts = 5; + while (attempts > 0) { + try { + // Enable replication on table1 + conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); + // Replicate table1 to cluster1 in the table with id of '4' + conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "4"); + attempts = 0; + } catch (Exception e) { + attempts--; + if (attempts <= 0) { + throw e; + } + sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + } + } + + // Write some data to table1 + writeSomeData(conn, table1, 2000, 50); + conn.tableOperations().flush(table1, null, null, true); + + String tableId = conn.tableOperations().tableIdMap().get(table1); + Assert.assertNotNull("Table ID was null", tableId); + + // Make sure the replication table exists at this point - boolean online = ReplicationTable.isOnline(conn); - attempts = 5; - do { - if (!online) { - sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - online = ReplicationTable.isOnline(conn); - attempts--; - } - } while (!online && attempts > 0); - Assert.assertTrue("Replication table did not exist", online); ++ while (!ReplicationTable.isOnline(conn)) { ++ sleepUninterruptibly(MILLIS_BETWEEN_REPLICATION_TABLE_ONLINE_CHECKS, TimeUnit.MILLISECONDS); ++ } ++ Assert.assertTrue("Replication table did not exist", ReplicationTable.isOnline(conn)); + + for (int i = 0; i < 5 && !conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ); i++) { + Thread.sleep(1000); + } + + Assert.assertTrue(conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ)); + + boolean notFound = true; + Scanner s; + for (int i = 0; i < 10 && notFound; i++) { + s = ReplicationTable.getScanner(conn); + WorkSection.limit(s); + try { + Entry e = Iterables.getOnlyElement(s); + Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText(); + Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier()); + notFound = false; + } catch (NoSuchElementException e) {} catch (IllegalArgumentException e) { + s = ReplicationTable.getScanner(conn); + for (Entry content : s) { + log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue()); + } + Assert.fail("Found more than one work section entry"); + } + + Thread.sleep(500); + } + + if (notFound) { + s = ReplicationTable.getScanner(conn); + for (Entry content : s) { + log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue()); + } + Assert.assertFalse("Did not find the work entry for the status entry", notFound); + } + } + + @Test + public void replicationRecordsAreClosedAfterGarbageCollection() throws Exception { + getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); + + final Connector conn = getConnector(); + + ReplicationTable.setOnline(conn); + conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE); + conn.tableOperations().deleteRows(ReplicationTable.NAME, null, null); + + final AtomicBoolean keepRunning = new AtomicBoolean(true); + final Set metadataWals = new HashSet<>(); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + // Should really be able to interrupt here, but the Scanner throws a fit to the logger + // when that happens + while (keepRunning.get()) { + try { + metadataWals.addAll(getLogs(conn).keySet()); + } catch (Exception e) { + log.error("Metadata table doesn't exist"); + } + } + } + + }); + + t.start(); + + String table1 = "table1", table2 = "table2", table3 = "table3"; + + try { + conn.tableOperations().create(table1); + conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); + conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); + conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1", + ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, null)); + + // Write some data to table1 + writeSomeData(conn, table1, 200, 500); + + conn.tableOperations().create(table2); + conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true"); + conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); + + writeSomeData(conn, table2, 200, 500); + + conn.tableOperations().create(table3); + conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true"); + conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); + + writeSomeData(conn, table3, 200, 500); + + // Flush everything to try to make the replication records + for (String table : Arrays.asList(table1, table2, table3)) { + conn.tableOperations().compact(table, null, null, true, true); + } + } finally { + keepRunning.set(false); + t.join(5000); + Assert.assertFalse(t.isAlive()); + } + + // Kill the tserver(s) and restart them + // to ensure that the WALs we previously observed all move to closed. + cluster.getClusterControl().stop(ServerType.TABLET_SERVER); + cluster.getClusterControl().start(ServerType.TABLET_SERVER); + + // Make sure we can read all the tables (recovery complete) + for (String table : Arrays.asList(table1, table2, table3)) { + Iterators.size(conn.createScanner(table, Authorizations.EMPTY).iterator()); + } + + // Starting the gc will run CloseWriteAheadLogReferences which will first close Statuses + // in the metadata table, and then in the replication table + Process gc = cluster.exec(SimpleGarbageCollector.class); + + waitForGCLock(conn); + + Thread.sleep(1000); + + log.info("GC is up and should have had time to run at least once by now"); + + try { + boolean allClosed = true; + + // We should either find all closed records or no records + // After they're closed, they are candidates for deletion + for (int i = 0; i < 10; i++) { + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.setRange(Range.prefix(ReplicationSection.getRowPrefix())); + Iterator> iter = s.iterator(); + + long recordsFound = 0l; + while (allClosed && iter.hasNext()) { + Entry entry = iter.next(); + String wal = entry.getKey().getRow().toString(); + if (metadataWals.contains(wal)) { + Status status = Status.parseFrom(entry.getValue().get()); + log.info("{}={}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(status)); + allClosed &= status.getClosed(); + recordsFound++; + } + } + + log.info("Found {} records from the metadata table", recordsFound); + if (allClosed) { + break; + } + + sleepUninterruptibly(2, TimeUnit.SECONDS); + } + + if (!allClosed) { + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.setRange(Range.prefix(ReplicationSection.getRowPrefix())); + for (Entry entry : s) { + log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get()))); + } + Assert.fail("Expected all replication records in the metadata table to be closed"); + } + + for (int i = 0; i < 10; i++) { + allClosed = true; + + Scanner s = ReplicationTable.getScanner(conn); + Iterator> iter = s.iterator(); + + long recordsFound = 0l; + while (allClosed && iter.hasNext()) { + Entry entry = iter.next(); + String wal = entry.getKey().getRow().toString(); + if (metadataWals.contains(wal)) { + Status status = Status.parseFrom(entry.getValue().get()); + log.info("{}={}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(status)); + allClosed &= status.getClosed(); + recordsFound++; + } + } + + log.info("Found {} records from the replication table", recordsFound); + if (allClosed) { + break; + } + + sleepUninterruptibly(3, TimeUnit.SECONDS); + } + + if (!allClosed) { + Scanner s = ReplicationTable.getScanner(conn); + StatusSection.limit(s); + for (Entry entry : s) { + log.info(entry.getKey().toStringNoTruncate() + " " + TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get()))); + } + Assert.fail("Expected all replication records in the replication table to be closed"); + } + + } finally { + gc.destroy(); + gc.waitFor(); + } + + } + + @Test + public void replicatedStatusEntriesAreDeleted() throws Exception { + // Just stop it now, we'll restart it after we restart the tserver + getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); + + final Connector conn = getConnector(); + log.info("Got connector to MAC"); + String table1 = "table1"; + + // replication shouldn't be online when we begin + Assert.assertFalse(ReplicationTable.isOnline(conn)); + + // Create two tables + conn.tableOperations().create(table1); + + int attempts = 5; + while (attempts > 0) { + try { + // Enable replication on table1 + conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); + // Replicate table1 to cluster1 in the table with id of '4' + conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "4"); + // Use the MockReplicaSystem impl and sleep for 5seconds + conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1", + ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "1000")); + attempts = 0; + } catch (Exception e) { + attempts--; + if (attempts <= 0) { + throw e; + } + sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + } + } + + String tableId = conn.tableOperations().tableIdMap().get(table1); + Assert.assertNotNull("Could not determine table id for " + table1, tableId); + + // Write some data to table1 + writeSomeData(conn, table1, 2000, 50); + conn.tableOperations().flush(table1, null, null, true); + + // Make sure the replication table exists at this point - boolean online = ReplicationTable.isOnline(conn); - attempts = 10; - do { - if (!online) { - sleepUninterruptibly(1, TimeUnit.SECONDS); - online = ReplicationTable.isOnline(conn); - attempts--; - } - } while (!online && attempts > 0); - Assert.assertTrue("Replication table did not exist", online); ++ while (!ReplicationTable.isOnline(conn)) { ++ sleepUninterruptibly(MILLIS_BETWEEN_REPLICATION_TABLE_ONLINE_CHECKS, TimeUnit.MILLISECONDS); ++ } ++ Assert.assertTrue("Replication table did not exist", ReplicationTable.isOnline(conn)); + + // Grant ourselves the write permission for later + conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE); + + log.info("Checking for replication entries in replication"); + // Then we need to get those records over to the replication table + Scanner s; + Set entries = new HashSet<>(); + for (int i = 0; i < 5; i++) { + s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.setRange(ReplicationSection.getRange()); + entries.clear(); + for (Entry entry : s) { + entries.add(entry.getKey().getRow().toString()); + log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); + } + if (!entries.isEmpty()) { + log.info("Replication entries {}", entries); + break; + } + Thread.sleep(1000); + } + + Assert.assertFalse("Did not find any replication entries in the replication table", entries.isEmpty()); + + // Find the WorkSection record that will be created for that data we ingested + boolean notFound = true; + for (int i = 0; i < 10 && notFound; i++) { + try { + s = ReplicationTable.getScanner(conn); + WorkSection.limit(s); + Entry e = Iterables.getOnlyElement(s); + log.info("Found entry: " + e.getKey().toStringNoTruncate()); + Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText(); + Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier()); + notFound = false; + } catch (NoSuchElementException e) { + + } catch (IllegalArgumentException e) { + // Somehow we got more than one element. Log what they were + s = ReplicationTable.getScanner(conn); + for (Entry content : s) { + log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue()); + } + Assert.fail("Found more than one work section entry"); + } catch (RuntimeException e) { + // Catch a propagation issue, fail if it's not what we expect + Throwable cause = e.getCause(); + if (cause instanceof AccumuloSecurityException) { + AccumuloSecurityException sec = (AccumuloSecurityException) cause; + switch (sec.getSecurityErrorCode()) { + case PERMISSION_DENIED: + // retry -- the grant didn't happen yet + log.warn("Sleeping because permission was denied"); + break; + default: + throw e; + } + } else { + throw e; + } + } + + Thread.sleep(2000); + } + + if (notFound) { + s = ReplicationTable.getScanner(conn); + for (Entry content : s) { + log.info(content.getKey().toStringNoTruncate() + " => " + ProtobufUtil.toString(Status.parseFrom(content.getValue().get()))); + } + Assert.assertFalse("Did not find the work entry for the status entry", notFound); + } + + /** + * By this point, we should have data ingested into a table, with at least one WAL as a candidate for replication. Compacting the table should close all + * open WALs, which should ensure all records we're going to replicate have entries in the replication table, and nothing will exist in the metadata table + * anymore + */ + + log.info("Killing tserver"); + // Kill the tserver(s) and restart them + // to ensure that the WALs we previously observed all move to closed. + cluster.getClusterControl().stop(ServerType.TABLET_SERVER); + + log.info("Starting tserver"); + cluster.getClusterControl().start(ServerType.TABLET_SERVER); + + log.info("Waiting to read tables"); + sleepUninterruptibly(2 * 3, TimeUnit.SECONDS); + + // Make sure we can read all the tables (recovery complete) + for (String table : new String[] {MetadataTable.NAME, table1}) { + Iterators.size(conn.createScanner(table, Authorizations.EMPTY).iterator()); + } + + log.info("Recovered metadata:"); + s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + for (Entry entry : s) { + log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); + } + + cluster.getClusterControl().start(ServerType.GARBAGE_COLLECTOR); + + // Wait for a bit since the GC has to run (should be running after a one second delay) + waitForGCLock(conn); + + Thread.sleep(1000); + + log.info("After GC"); + s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + for (Entry entry : s) { + log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); + } + + // We expect no records in the metadata table after compaction. We have to poll + // because we have to wait for the StatusMaker's next iteration which will clean + // up the dangling *closed* records after we create the record in the replication table. + // We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove the record + log.info("Checking metadata table for replication entries"); + Set remaining = new HashSet<>(); + for (int i = 0; i < 10; i++) { + s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.setRange(ReplicationSection.getRange()); + remaining.clear(); + for (Entry e : s) { + remaining.add(e.getKey().getRow().toString()); + } + remaining.retainAll(entries); + if (remaining.isEmpty()) { + break; + } + log.info("remaining {}", remaining); + Thread.sleep(2000); + log.info(""); + } + + Assert.assertTrue("Replication status messages were not cleaned up from metadata table", remaining.isEmpty()); + + /** + * After we close out and subsequently delete the metadata record, this will propagate to the replication table, which will cause those records to be + * deleted after replication occurs + */ + + int recordsFound = 0; + for (int i = 0; i < 30; i++) { + s = ReplicationTable.getScanner(conn); + recordsFound = 0; + for (Entry entry : s) { + recordsFound++; + log.info("{} {}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(entry.getValue().get()))); + } + + if (recordsFound <= 2) { + break; + } else { + Thread.sleep(1000); + log.info(""); + } + } + + Assert.assertTrue("Found unexpected replication records in the replication table", recordsFound <= 2); + } +}