From commits-return-21401-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Thu Jan 25 23:30:26 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id BD140180651 for ; Thu, 25 Jan 2018 23:30:26 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id AD4B2160C17; Thu, 25 Jan 2018 22:30:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7DF50160C4F for ; Thu, 25 Jan 2018 23:30:25 +0100 (CET) Received: (qmail 67686 invoked by uid 500); 25 Jan 2018 22:30:24 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 67676 invoked by uid 99); 25 Jan 2018 22:30:24 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Jan 2018 22:30:24 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 207BD81F91; Thu, 25 Jan 2018 22:30:23 +0000 (UTC) Date: Thu, 25 Jan 2018 22:30:25 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] 02/02: ACCUMULO-4777: Added a backoff mechanism for writing to the WALs as we do for creating WALs but with unlimited retries. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: ibella@apache.org In-Reply-To: <151691942298.28543.11079891643665933205@gitbox.apache.org> References: <151691942298.28543.11079891643665933205@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/1.7 X-Git-Reftype: branch X-Git-Rev: 79e127f2b1dd7352bdc7b30ceef74391d4930a9f X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20180125223023.207BD81F91@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. ibella pushed a commit to branch 1.7 in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 79e127f2b1dd7352bdc7b30ceef74391d4930a9f Author: Ivan Bella AuthorDate: Fri Jan 12 09:45:42 2018 -0500 ACCUMULO-4777: Added a backoff mechanism for writing to the WALs as we do for creating WALs but with unlimited retries. --- .../org/apache/accumulo/core/conf/Property.java | 8 +-- .../org/apache/accumulo/fate/zookeeper/Retry.java | 22 +++++++- .../accumulo/fate/zookeeper/RetryFactory.java | 29 ++++++++++ .../accumulo/fate/zookeeper/RetryFactoryTest.java | 12 +++++ .../apache/accumulo/fate/zookeeper/RetryTest.java | 23 ++++++++ .../org/apache/accumulo/tserver/TabletServer.java | 18 ++++--- .../accumulo/tserver/log/TabletServerLogger.java | 63 +++++++++++++--------- 7 files changed, 137 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 800db90..5a32a83 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -258,13 +258,13 @@ public enum Property { "The maximum size for each write-ahead log. See comment for property tserver.memory.maps.max"), TSERV_WALOG_MAX_AGE("tserver.walog.max.age", "24h", PropertyType.TIMEDURATION, "The maximum age for each write-ahead log."), TSERV_WALOG_TOLERATED_CREATION_FAILURES("tserver.walog.tolerated.creation.failures", "50", PropertyType.COUNT, - "The maximum number of failures tolerated when creating a new WAL file within the period specified by tserver.walog.failures.period." - + " Exceeding this number of failures in the period causes the TabletServer to exit."), + "The maximum number of failures tolerated when creating a new WAL file. Values < 0 will allow unlimited creation failures." + + " Exceeding this number of failures consecutively trying to create a new WAL causes the TabletServer to exit."), TSERV_WALOG_TOLERATED_WAIT_INCREMENT("tserver.walog.tolerated.wait.increment", "1000ms", PropertyType.TIMEDURATION, - "The amount of time to wait between failures to create a WALog."), + "The amount of time to wait between failures to create or write a WALog."), // Never wait longer than 5 mins for a retry TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION("tserver.walog.maximum.wait.duration", "5m", PropertyType.TIMEDURATION, - "The maximum amount of time to wait after a failure to create a WAL file."), + "The maximum amount of time to wait after a failure to create or write a WAL file."), TSERV_MAJC_DELAY("tserver.compaction.major.delay", "30s", PropertyType.TIMEDURATION, "Time a tablet server will sleep between checking which tablets need compaction."), TSERV_MAJC_THREAD_MAXOPEN("tserver.compaction.major.thread.files.open.max", "10", PropertyType.COUNT, diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java index e84b1af..1f55d72 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java @@ -25,12 +25,14 @@ import org.slf4j.LoggerFactory; public class Retry { private static final Logger log = LoggerFactory.getLogger(Retry.class); + public static final long MAX_RETRY_DISABLED = -1; + private long maxRetries, maxWait, waitIncrement; private long retriesDone, currentWait; /** * @param maxRetries - * Maximum times to retry + * Maximum times to retry or MAX_RETRY_DISABLED if no maximum * @param startWait * The amount of time (ms) to wait for the initial retry * @param maxWait @@ -46,6 +48,18 @@ public class Retry { this.currentWait = startWait; } + /** + * @param startWait + * The amount of time (ms) to wait for the initial retry + * @param maxWait + * The maximum wait (ms) + * @param waitIncrement + * The amount of time (ms) to increment next wait time by + */ + public Retry(long startWait, long waitIncrement, long maxWait) { + this(MAX_RETRY_DISABLED, startWait, waitIncrement, maxWait); + } + // Visible for testing long getMaxRetries() { return maxRetries; @@ -86,8 +100,12 @@ public class Retry { this.maxWait = maxWait; } + public boolean isMaxRetryDisabled() { + return maxRetries < 0; + } + public boolean canRetry() { - return retriesDone < maxRetries; + return isMaxRetryDisabled() || (retriesDone < maxRetries); } public void useRetry() { diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java index 63a1241..aa6da20 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java @@ -25,6 +25,18 @@ public class RetryFactory { private final long maxRetries, startWait, maxWait, waitIncrement; + /** + * Create a retry factor for retries with a limit + * + * @param maxRetries + * The maximum number of retries + * @param startWait + * The wait ms for the first retry + * @param waitIncrement + * The amount of ms to increment the wait on subsequent retries + * @param maxWait + * The max amount of wait time between retries + */ public RetryFactory(long maxRetries, long startWait, long waitIncrement, long maxWait) { this.maxRetries = maxRetries; this.startWait = startWait; @@ -32,6 +44,23 @@ public class RetryFactory { this.waitIncrement = waitIncrement; } + /** + * Create a retry factory for retries that have no limit + * + * @param startWait + * The wait ms for the first retry + * @param waitIncrement + * The amount of ms to increment the wait on subsequent retries + * @param maxWait + * The max amount of wait time between retries + */ + public RetryFactory(long startWait, long waitIncrement, long maxWait) { + this.maxRetries = Retry.MAX_RETRY_DISABLED; + this.startWait = startWait; + this.maxWait = maxWait; + this.waitIncrement = waitIncrement; + } + public Retry create() { return new Retry(maxRetries, startWait, waitIncrement, maxWait); } diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryFactoryTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryFactoryTest.java index cb3d608..9ba19a4 100644 --- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryFactoryTest.java +++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryFactoryTest.java @@ -36,4 +36,16 @@ public class RetryFactoryTest { Assert.assertEquals(waitIncrement, retry.getWaitIncrement()); } + @Test + public void properArgumentsInUnlimitedRetry() { + long startWait = 50l, maxWait = 5000l, waitIncrement = 500l; + RetryFactory factory = new RetryFactory(startWait, waitIncrement, maxWait); + Retry retry = factory.create(); + + Assert.assertEquals(Retry.MAX_RETRY_DISABLED, retry.getMaxRetries()); + Assert.assertEquals(startWait, retry.getCurrentWait()); + Assert.assertEquals(maxWait, retry.getMaxWait()); + Assert.assertEquals(waitIncrement, retry.getWaitIncrement()); + } + } diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryTest.java index e37af01..6bbd1ff 100644 --- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryTest.java +++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryTest.java @@ -28,10 +28,14 @@ public class RetryTest { private Retry retry; long initialWait = 1000l, waitIncrement = 1000l, maxRetries = 5; + private Retry unlimitedRetry1; + private Retry unlimitedRetry2; @Before public void setup() { retry = new Retry(maxRetries, initialWait, waitIncrement, maxRetries * 1000l); + unlimitedRetry1 = new Retry(initialWait, waitIncrement, maxRetries * 1000l); + unlimitedRetry2 = new Retry(-10, initialWait, waitIncrement, maxRetries * 1000l); } @Test @@ -124,4 +128,23 @@ public class RetryTest { EasyMock.verify(retry); } + + @Test + public void testIsMaxRetryDisabled() { + Assert.assertFalse(retry.isMaxRetryDisabled()); + Assert.assertTrue(unlimitedRetry1.isMaxRetryDisabled()); + Assert.assertTrue(unlimitedRetry2.isMaxRetryDisabled()); + Assert.assertEquals(Retry.MAX_RETRY_DISABLED, unlimitedRetry1.getMaxRetries()); + Assert.assertEquals(-10, unlimitedRetry2.getMaxRetries()); + } + + @Test + public void testUnlimitedRetry() { + for (int i = 0; i < Integer.MAX_VALUE; i++) { + Assert.assertTrue(unlimitedRetry1.canRetry()); + unlimitedRetry1.useRetry(); + Assert.assertTrue(unlimitedRetry2.canRetry()); + unlimitedRetry2.useRetry(); + } + } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 852c611..a277714 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -350,14 +350,16 @@ public class TabletServer extends AccumuloServerContext implements Runnable { + minBlockSize + ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE + " or decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml."); final long toleratedWalCreationFailures = aconf.getCount(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES); - final long walCreationFailureRetryIncrement = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT); - final long walCreationFailureRetryMax = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION); - // Tolerate `toleratedWalCreationFailures` failures, waiting `walCreationFailureRetryIncrement` milliseconds after the first failure, - // incrementing the next wait period by the same value, for a maximum of `walCreationFailureRetryMax` retries. - final RetryFactory walCreationRetryFactory = new RetryFactory(toleratedWalCreationFailures, walCreationFailureRetryIncrement, - walCreationFailureRetryIncrement, walCreationFailureRetryMax); - - logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter, walCreationRetryFactory, walogMaxAge); + final long walFailureRetryIncrement = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT); + final long walFailureRetryMax = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION); + // Tolerate `toleratedWalCreationFailures` failures, waiting `walFailureRetryIncrement` milliseconds after the first failure, + // incrementing the next wait period by the same value, for a maximum of `walFailureRetryMax` retries. + final RetryFactory walCreationRetryFactory = new RetryFactory(toleratedWalCreationFailures, walFailureRetryIncrement, walFailureRetryIncrement, + walFailureRetryMax); + // Tolerate infinite failures for the write, however backing off the same as for creation failures. + final RetryFactory walWritingRetryFactory = new RetryFactory(walFailureRetryIncrement, walFailureRetryIncrement, walFailureRetryMax); + + logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter, walCreationRetryFactory, walWritingRetryFactory, walogMaxAge); this.resourceManager = new TabletServerResourceManager(this, fs); this.security = AuditedSecurityOperation.getInstance(this); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index ef7784a..1487199 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -36,7 +36,6 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationConfigurationUtil; -import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.fate.zookeeper.Retry; import org.apache.accumulo.fate.zookeeper.RetryFactory; import org.apache.accumulo.server.conf.TableConfiguration; @@ -91,8 +90,10 @@ public class TabletServerLogger { private long createTime = 0; - private final RetryFactory retryFactory; - private Retry retry = null; + private final RetryFactory createRetryFactory; + private Retry createRetry = null; + + private final RetryFactory writeRetryFactory; static private abstract class TestCallWithWriteLock { abstract boolean test(); @@ -137,13 +138,15 @@ public class TabletServerLogger { } } - public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong syncCounter, AtomicLong flushCounter, RetryFactory retryFactory, long maxAge) { + public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong syncCounter, AtomicLong flushCounter, RetryFactory createRetryFactory, + RetryFactory writeRetryFactory, long maxAge) { this.tserver = tserver; this.maxSize = maxSize; this.syncCounter = syncCounter; this.flushCounter = flushCounter; - this.retryFactory = retryFactory; - this.retry = null; + this.createRetryFactory = createRetryFactory; + this.createRetry = null; + this.writeRetryFactory = writeRetryFactory; this.maxAge = maxAge; } @@ -204,25 +207,25 @@ public class TabletServerLogger { logSetId.incrementAndGet(); // When we successfully create a WAL, make sure to reset the Retry. - if (null != retry) { - retry = null; + if (null != createRetry) { + createRetry = null; } this.createTime = System.currentTimeMillis(); return; } catch (Exception t) { - if (null == retry) { - retry = retryFactory.create(); + if (null == createRetry) { + createRetry = createRetryFactory.create(); } // We have more retries or we exceeded the maximum number of accepted failures - if (retry.canRetry()) { - // Use the retry and record the time in which we did so - retry.useRetry(); + if (createRetry.canRetry()) { + // Use the createRetry and record the time in which we did so + createRetry.useRetry(); try { // Backoff - retry.waitForNextAttempt(); + createRetry.waitForNextAttempt(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); @@ -264,15 +267,22 @@ public class TabletServerLogger { } private void write(CommitSession commitSession, boolean mincFinish, Writer writer) throws IOException { + write(commitSession, mincFinish, writer, writeRetryFactory.create()); + } + + private void write(CommitSession commitSession, boolean mincFinish, Writer writer, Retry writeRetry) throws IOException { List sessions = Collections.singletonList(commitSession); - write(sessions, mincFinish, writer); + write(sessions, mincFinish, writer, writeRetry); } private void write(final Collection sessions, boolean mincFinish, Writer writer) throws IOException { + write(sessions, mincFinish, writer, writeRetryFactory.create()); + } + + private void write(final Collection sessions, boolean mincFinish, Writer writer, Retry writeRetry) throws IOException { // Work very hard not to lock this during calls to the outside world int currentLogSet = logSetId.get(); - int attempt = 1; boolean success = false; while (!success) { try { @@ -288,7 +298,7 @@ public class TabletServerLogger { if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) { try { // Scribble out a tablet definition and then write to the metadata table - defineTablet(commitSession); + defineTablet(commitSession, writeRetry); if (currentLogSet == logSetId.get()) tserver.addLoggersToMetadata(copy, commitSession.getExtent(), commitSession.getLogId()); } finally { @@ -328,14 +338,19 @@ public class TabletServerLogger { success = (currentLogSet == logSetId.get()); } } catch (DfsLogger.LogClosedException ex) { - log.debug("Logs closed while writing, retrying " + attempt); + log.debug("Logs closed while writing, retrying attempt " + writeRetry.retriesCompleted()); } catch (Exception t) { - if (attempt != 1) { - log.error("Unexpected error writing to log, retrying attempt " + attempt, t); + log.warn("Failed to write to WAL, retrying attempt " + writeRetry.retriesCompleted(), t); + + try { + // Backoff + writeRetry.waitForNextAttempt(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); } - UtilWaitThread.sleep(100); } finally { - attempt++; + writeRetry.useRetry(); } // Some sort of write failure occurred. Grab the write lock and reset the logs. // But since multiple threads will attempt it, only attempt the reset when @@ -377,7 +392,7 @@ public class TabletServerLogger { // TODO We can close the WAL here for replication purposes } - public void defineTablet(final CommitSession commitSession) throws IOException { + public void defineTablet(final CommitSession commitSession, final Retry writeRetry) throws IOException { // scribble this into the metadata tablet, too. write(commitSession, false, new Writer() { @Override @@ -385,7 +400,7 @@ public class TabletServerLogger { logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent()); return DfsLogger.NO_WAIT_LOGGER_OP; } - }); + }, writeRetry); } public void log(final CommitSession commitSession, final long tabletSeq, final Mutation m, final Durability durability) throws IOException { -- To stop receiving notification emails like this one, please contact ibella@apache.org.