From commits-return-23568-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Sat Nov 23 00:14:44 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 943AA180661 for ; Sat, 23 Nov 2019 01:14:43 +0100 (CET) Received: (qmail 63998 invoked by uid 500); 23 Nov 2019 00:14:42 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 63984 invoked by uid 99); 23 Nov 2019 00:14:42 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Nov 2019 00:14:42 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 84FBB8B690; Sat, 23 Nov 2019 00:14:42 +0000 (UTC) Date: Sat, 23 Nov 2019 00:14:42 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] branch master updated: Improve Upgrader9to10 code (#1441) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157446808239.24113.17170838025942663514@gitbox.apache.org> From: ctubbsii@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 84fb59e2b0e660653e9236cbe700d85474abbe68 X-Git-Newrev: b0bbff007d372fb3a9e3c4c4f900e063a65a40b1 X-Git-Rev: b0bbff007d372fb3a9e3c4c4f900e063a65a40b1 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/master by this push: new b0bbff0 Improve Upgrader9to10 code (#1441) b0bbff0 is described below commit b0bbff007d372fb3a9e3c4c4f900e063a65a40b1 Author: Christopher Tubbs AuthorDate: Fri Nov 22 19:14:35 2019 -0500 Improve Upgrader9to10 code (#1441) This changes the batching strategy for the Upgrader9to10 class. The previous strategy was to fill a data structure until JVM memory was sufficiently used up (50% of heap size), then process the upgrade for those entries, and repeat until no more entries are left to upgrade. This new strategy batches candidates for upgrade in batches of size 4 million characters (approx. 8MB batches), regardless of memory. This stabilizes the testing, as it is much simpler to reproduce and test fixed-size batches, than it is to manipulate the JVM heap size during testing. As a result, many improvements to the GCUpgrade9to10TestIT were made here. This also fixes that IT, which began failing with the use of the G1GC instead of CMS (#1427), because G1GC didn't work well with a master configured with a 16MB JVM heap and crashed frequently OOMEs. --- .../accumulo/master/upgrade/Upgrader9to10.java | 39 +++---- .../test/upgrade/GCUpgrade9to10TestIT.java | 115 +++++++++++---------- 2 files changed, 75 insertions(+), 79 deletions(-) diff --git a/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java b/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java index 89e7454..2f0d7d3 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java +++ b/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java @@ -22,7 +22,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET; import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET_GC_CANDIDATES; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN; -import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; import static org.apache.accumulo.server.util.MetadataTableUtil.EMPTY_TEXT; import java.io.IOException; @@ -34,7 +33,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; import java.util.stream.StreamSupport; import org.apache.accumulo.core.Constants; @@ -102,11 +100,8 @@ public class Upgrader9to10 implements Upgrader { public static final Value UPGRADED = MetadataSchema.DeletesSection.SkewedKeyValue.NAME; public static final String OLD_DELETE_PREFIX = "~del"; - /** - * This percentage was taken from the SimpleGarbageCollector and if nothing else is going on - * during upgrade then it could be larger. - */ - static final float CANDIDATE_MEMORY_PERCENTAGE = 0.50f; + // effectively an 8MB batch size, since this number is the number of Chars + public static final long CANDIDATE_BATCH_SIZE = 4_000_000; @Override public void upgradeZookeeper(ServerContext ctx) { @@ -405,11 +400,8 @@ public class Upgrader9to10 implements Upgrader { try (BatchWriter writer = c.createBatchWriter(tableName, new BatchWriterConfig())) { log.info("looking for candidates in table {}", tableName); Iterator oldCandidates = getOldCandidates(ctx, tableName); - int t = 0; // no waiting first time through while (oldCandidates.hasNext()) { - // give it some time for memory to clean itself up if needed - sleepUninterruptibly(t, TimeUnit.SECONDS); - List deletes = readCandidatesThatFitInMemory(oldCandidates); + List deletes = readCandidatesInBatch(oldCandidates); log.info("found {} deletes to upgrade", deletes.size()); for (String olddelete : deletes) { // create new formatted delete @@ -428,7 +420,6 @@ public class Upgrader9to10 implements Upgrader { writer.addMutation(deleteOldDeleteMutation(olddelete)); } writer.flush(); - t = 3; } } catch (TableNotFoundException | MutationsRejectedException e) { throw new RuntimeException(e); @@ -466,14 +457,18 @@ public class Upgrader9to10 implements Upgrader { .iterator(); } - private List readCandidatesThatFitInMemory(Iterator candidates) { + private List readCandidatesInBatch(Iterator candidates) { + long candidateLength = 0; List result = new ArrayList<>(); - // Always read at least one. If memory doesn't clean up fast enough at least - // some progress is made. while (candidates.hasNext()) { - result.add(candidates.next()); - if (almostOutOfMemory(Runtime.getRuntime())) + String candidate = candidates.next(); + candidateLength += candidate.length(); + result.add(candidate); + if (candidateLength > CANDIDATE_BATCH_SIZE) { + log.trace("List of delete candidates has exceeded the batch size" + + " threshold. Attempting to delete what has been gathered so far."); break; + } } return result; } @@ -484,16 +479,6 @@ public class Upgrader9to10 implements Upgrader { return m; } - private boolean almostOutOfMemory(Runtime runtime) { - if (runtime.totalMemory() - runtime.freeMemory() - > CANDIDATE_MEMORY_PERCENTAGE * runtime.maxMemory()) { - log.info("List of delete candidates has exceeded the memory" - + " threshold. Attempting to delete what has been gathered so far."); - return true; - } else - return false; - } - public void upgradeDirColumns(ServerContext ctx, Ample.DataLevel level) { String tableName = level.metaTable(); AccumuloClient c = ctx; diff --git a/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java b/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java index 86adfa1..26a8f3a 100644 --- a/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java +++ b/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java @@ -20,8 +20,11 @@ package org.apache.accumulo.test.upgrade; import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.TreeMap; @@ -32,7 +35,6 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; @@ -44,20 +46,18 @@ import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.master.upgrade.Upgrader9to10; -import org.apache.accumulo.minicluster.MemoryUnit; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.miniclusterImpl.ProcessNotFoundException; import org.apache.accumulo.server.gc.GcVolumeUtil; import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.Text; import org.apache.zookeeper.KeeperException; import org.junit.Test; -import com.google.common.collect.Iterators; - public class GCUpgrade9to10TestIT extends ConfigurableMacBase { private static final String OUR_SECRET = "itsreallysecret"; private static final String OLDDELPREFIX = "~del"; @@ -72,14 +72,7 @@ public class GCUpgrade9to10TestIT extends ConfigurableMacBase { public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); cfg.setProperty(Property.INSTANCE_SECRET, OUR_SECRET); - cfg.setDefaultMemory(64, MemoryUnit.MEGABYTE); - cfg.setMemory(ServerType.MASTER, 16, MemoryUnit.MEGABYTE); - cfg.setMemory(ServerType.ZOOKEEPER, 32, MemoryUnit.MEGABYTE); - cfg.setProperty(Property.GC_CYCLE_START, "1"); - cfg.setProperty(Property.GC_CYCLE_DELAY, "1"); - cfg.setProperty(Property.GC_PORT, "0"); - cfg.setProperty(Property.TSERV_MAXMEM, "5K"); - cfg.setProperty(Property.TSERV_MAJC_DELAY, "1"); + cfg.setProperty(Property.GC_CYCLE_START, "1000"); // gc will be killed before it is run // use raw local file system so walogs sync and flush will work hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); @@ -118,38 +111,65 @@ public class GCUpgrade9to10TestIT extends ConfigurableMacBase { } /** - * This is really hard to make happen - the minicluster can only use so little memory to start up. - * The {@link org.apache.accumulo.master.upgrade.Upgrader9to10} CANDIDATE_MEMORY_PERCENTAGE can be - * adjusted. + * Ensure that the size of the candidates exceeds the {@link Upgrader9to10}'s CANDIDATE_BATCH_SIZE + * and will clean up candidates in multiple batches, without running out of memory. */ @Test public void gcUpgradeOutofMemoryTest() throws Exception { killMacGc(); // we do not want anything deleted - int somebignumber = 100000; - String longpathname = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee" - + "ffffffffffgggggggggghhhhhhhhhhiiiiiiiiiijjjjjjjjjj" - + "kkkkkkkkkkkkkkkkkklllllllllllllllllllllmmmmmmmmmmmmmmmmmnnnnnnnnnnnnnnnn"; - longpathname += longpathname; // make it even longer + int numberOfEntries = 100_000; + String longpathname = StringUtils.repeat("abcde", 100); + assertEquals(500, longpathname.length()); + + // sanity check to ensure that any batch size assumptions are still valid in this test + assertEquals(4_000_000, Upgrader9to10.CANDIDATE_BATCH_SIZE); + + // ensure test quality by making sure we have enough candidates to + // exceed the batch size at least ten times + long numBatches = numberOfEntries * longpathname.length() / Upgrader9to10.CANDIDATE_BATCH_SIZE; + assertTrue("Expected numBatches between 10 and 15, but was " + numBatches, + numBatches > 10 && numBatches < 15); + Ample.DataLevel level = Ample.DataLevel.USER; log.info("Filling metadata table with lots of bogus delete flags"); try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { - addEntries(c, level.metaTable(), somebignumber, longpathname); + Map expected = addEntries(c, level.metaTable(), numberOfEntries, longpathname); + assertEquals(numberOfEntries + numberOfEntries / 10, expected.size()); + + Range range = MetadataSchema.DeletesSection.getRange(); sleepUninterruptibly(1, TimeUnit.SECONDS); + try (Scanner scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY)) { + Map actualOldStyle = new HashMap<>(); + scanner.setRange(range); + scanner.forEach(entry -> { + String strKey = entry.getKey().getRow().toString(); + String strValue = entry.getValue().toString(); + actualOldStyle.put(strKey, strValue); + }); + assertEquals(expected.size(), actualOldStyle.size()); + assertTrue(Collections.disjoint(expected.keySet(), actualOldStyle.keySet())); + } + upgrader.upgradeFileDeletes(getServerContext(), level); sleepUninterruptibly(1, TimeUnit.SECONDS); - Range range = MetadataSchema.DeletesSection.getRange(); - Scanner scanner; - try { - scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY); - } catch (TableNotFoundException e) { - throw new RuntimeException(e); + try (Scanner scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY)) { + Map actualNewStyle = new HashMap<>(); + scanner.setRange(range); + scanner.forEach(entry -> { + String strKey = entry.getKey().getRow().toString(); + String expectedValue = expected.get(strKey); + assertNotNull(expectedValue); + String strValue = entry.getValue().toString(); + assertEquals(expectedValue, strValue); + actualNewStyle.put(strKey, strValue); + }); + assertEquals(expected.size(), actualNewStyle.size()); + assertEquals(expected, actualNewStyle); } - scanner.setRange(range); - assertEquals(somebignumber + somebignumber / 10, Iterators.size(scanner.iterator())); } } @@ -160,41 +180,32 @@ public class GCUpgrade9to10TestIT extends ConfigurableMacBase { try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { Map expected = addEntries(c, level.metaTable(), count, "somefile"); - Map actual = new HashMap<>(); sleepUninterruptibly(1, TimeUnit.SECONDS); upgrader.upgradeFileDeletes(getServerContext(), level); sleepUninterruptibly(1, TimeUnit.SECONDS); Range range = MetadataSchema.DeletesSection.getRange(); - Scanner scanner; - try { - scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY); - } catch (TableNotFoundException e) { - throw new RuntimeException(e); + try (Scanner scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY)) { + Map actual = new HashMap<>(); + scanner.setRange(range); + scanner.forEach(entry -> { + actual.put(entry.getKey().getRow().toString(), entry.getValue().toString()); + }); + assertEquals(expected, actual); } - scanner.setRange(range); - scanner.iterator().forEachRemaining(entry -> { - actual.put(entry.getKey().getRow().toString(), entry.getValue().toString()); - }); - - assertEquals(expected, actual); // ENSURE IDEMPOTENCE - run upgrade again to ensure nothing is changed because there is // nothing to change upgrader.upgradeFileDeletes(getServerContext(), level); - try { - scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY); - } catch (TableNotFoundException e) { - throw new RuntimeException(e); + try (Scanner scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY)) { + Map actual = new HashMap<>(); + scanner.setRange(range); + scanner.forEach(entry -> { + actual.put(entry.getKey().getRow().toString(), entry.getValue().toString()); + }); + assertEquals(expected, actual); } - scanner.setRange(range); - actual.clear(); - scanner.iterator().forEachRemaining(entry -> { - actual.put(entry.getKey().getRow().toString(), entry.getValue().toString()); - }); - - assertEquals(expected, actual); } }