accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ibe...@apache.org
Subject [accumulo] 02/02: ACCUMULO-4777: Added a backoff mechanism for writing to the WALs as we do for creating WALs but with unlimited retries.
Date Thu, 25 Jan 2018 22:30:25 GMT
This is an automated email from the ASF dual-hosted git repository.

ibella pushed a commit to branch 1.7
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 79e127f2b1dd7352bdc7b30ceef74391d4930a9f
Author: Ivan Bella <ivan@bella.name>
AuthorDate: Fri Jan 12 09:45:42 2018 -0500

    ACCUMULO-4777: Added a backoff mechanism for writing to the WALs as we do for creating
WALs but with unlimited retries.
---
 .../org/apache/accumulo/core/conf/Property.java    |  8 +--
 .../org/apache/accumulo/fate/zookeeper/Retry.java  | 22 +++++++-
 .../accumulo/fate/zookeeper/RetryFactory.java      | 29 ++++++++++
 .../accumulo/fate/zookeeper/RetryFactoryTest.java  | 12 +++++
 .../apache/accumulo/fate/zookeeper/RetryTest.java  | 23 ++++++++
 .../org/apache/accumulo/tserver/TabletServer.java  | 18 ++++---
 .../accumulo/tserver/log/TabletServerLogger.java   | 63 +++++++++++++---------
 7 files changed, 137 insertions(+), 38 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 800db90..5a32a83 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -258,13 +258,13 @@ public enum Property {
       "The maximum size for each write-ahead log. See comment for property tserver.memory.maps.max"),
   TSERV_WALOG_MAX_AGE("tserver.walog.max.age", "24h", PropertyType.TIMEDURATION, "The maximum
age for each write-ahead log."),
   TSERV_WALOG_TOLERATED_CREATION_FAILURES("tserver.walog.tolerated.creation.failures", "50",
PropertyType.COUNT,
-      "The maximum number of failures tolerated when creating a new WAL file within the period
specified by tserver.walog.failures.period."
-          + " Exceeding this number of failures in the period causes the TabletServer to
exit."),
+      "The maximum number of failures tolerated when creating a new WAL file.  Values <
0 will allow unlimited creation failures."
+          + " Exceeding this number of failures consecutively trying to create a new WAL
causes the TabletServer to exit."),
   TSERV_WALOG_TOLERATED_WAIT_INCREMENT("tserver.walog.tolerated.wait.increment", "1000ms",
PropertyType.TIMEDURATION,
-      "The amount of time to wait between failures to create a WALog."),
+      "The amount of time to wait between failures to create or write a WALog."),
   // Never wait longer than 5 mins for a retry
   TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION("tserver.walog.maximum.wait.duration", "5m",
PropertyType.TIMEDURATION,
-      "The maximum amount of time to wait after a failure to create a WAL file."),
+      "The maximum amount of time to wait after a failure to create or write a WAL file."),
   TSERV_MAJC_DELAY("tserver.compaction.major.delay", "30s", PropertyType.TIMEDURATION,
       "Time a tablet server will sleep between checking which tablets need compaction."),
   TSERV_MAJC_THREAD_MAXOPEN("tserver.compaction.major.thread.files.open.max", "10", PropertyType.COUNT,
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
index e84b1af..1f55d72 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/Retry.java
@@ -25,12 +25,14 @@ import org.slf4j.LoggerFactory;
 public class Retry {
   private static final Logger log = LoggerFactory.getLogger(Retry.class);
 
+  public static final long MAX_RETRY_DISABLED = -1;
+
   private long maxRetries, maxWait, waitIncrement;
   private long retriesDone, currentWait;
 
   /**
    * @param maxRetries
-   *          Maximum times to retry
+   *          Maximum times to retry or MAX_RETRY_DISABLED if no maximum
    * @param startWait
    *          The amount of time (ms) to wait for the initial retry
    * @param maxWait
@@ -46,6 +48,18 @@ public class Retry {
     this.currentWait = startWait;
   }
 
+  /**
+   * @param startWait
+   *          The amount of time (ms) to wait for the initial retry
+   * @param maxWait
+   *          The maximum wait (ms)
+   * @param waitIncrement
+   *          The amount of time (ms) to increment next wait time by
+   */
+  public Retry(long startWait, long waitIncrement, long maxWait) {
+    this(MAX_RETRY_DISABLED, startWait, waitIncrement, maxWait);
+  }
+
   // Visible for testing
   long getMaxRetries() {
     return maxRetries;
@@ -86,8 +100,12 @@ public class Retry {
     this.maxWait = maxWait;
   }
 
+  public boolean isMaxRetryDisabled() {
+    return maxRetries < 0;
+  }
+
   public boolean canRetry() {
-    return retriesDone < maxRetries;
+    return isMaxRetryDisabled() || (retriesDone < maxRetries);
   }
 
   public void useRetry() {
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
index 63a1241..aa6da20 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/RetryFactory.java
@@ -25,6 +25,18 @@ public class RetryFactory {
 
   private final long maxRetries, startWait, maxWait, waitIncrement;
 
+  /**
+   * Create a retry factor for retries with a limit
+   *
+   * @param maxRetries
+   *          The maximum number of retries
+   * @param startWait
+   *          The wait ms for the first retry
+   * @param waitIncrement
+   *          The amount of ms to increment the wait on subsequent retries
+   * @param maxWait
+   *          The max amount of wait time between retries
+   */
   public RetryFactory(long maxRetries, long startWait, long waitIncrement, long maxWait)
{
     this.maxRetries = maxRetries;
     this.startWait = startWait;
@@ -32,6 +44,23 @@ public class RetryFactory {
     this.waitIncrement = waitIncrement;
   }
 
+  /**
+   * Create a retry factory for retries that have no limit
+   *
+   * @param startWait
+   *          The wait ms for the first retry
+   * @param waitIncrement
+   *          The amount of ms to increment the wait on subsequent retries
+   * @param maxWait
+   *          The max amount of wait time between retries
+   */
+  public RetryFactory(long startWait, long waitIncrement, long maxWait) {
+    this.maxRetries = Retry.MAX_RETRY_DISABLED;
+    this.startWait = startWait;
+    this.maxWait = maxWait;
+    this.waitIncrement = waitIncrement;
+  }
+
   public Retry create() {
     return new Retry(maxRetries, startWait, waitIncrement, maxWait);
   }
diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryFactoryTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryFactoryTest.java
index cb3d608..9ba19a4 100644
--- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryFactoryTest.java
+++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryFactoryTest.java
@@ -36,4 +36,16 @@ public class RetryFactoryTest {
     Assert.assertEquals(waitIncrement, retry.getWaitIncrement());
   }
 
+  @Test
+  public void properArgumentsInUnlimitedRetry() {
+    long startWait = 50l, maxWait = 5000l, waitIncrement = 500l;
+    RetryFactory factory = new RetryFactory(startWait, waitIncrement, maxWait);
+    Retry retry = factory.create();
+
+    Assert.assertEquals(Retry.MAX_RETRY_DISABLED, retry.getMaxRetries());
+    Assert.assertEquals(startWait, retry.getCurrentWait());
+    Assert.assertEquals(maxWait, retry.getMaxWait());
+    Assert.assertEquals(waitIncrement, retry.getWaitIncrement());
+  }
+
 }
diff --git a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryTest.java b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryTest.java
index e37af01..6bbd1ff 100644
--- a/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryTest.java
+++ b/fate/src/test/java/org/apache/accumulo/fate/zookeeper/RetryTest.java
@@ -28,10 +28,14 @@ public class RetryTest {
 
   private Retry retry;
   long initialWait = 1000l, waitIncrement = 1000l, maxRetries = 5;
+  private Retry unlimitedRetry1;
+  private Retry unlimitedRetry2;
 
   @Before
   public void setup() {
     retry = new Retry(maxRetries, initialWait, waitIncrement, maxRetries * 1000l);
+    unlimitedRetry1 = new Retry(initialWait, waitIncrement, maxRetries * 1000l);
+    unlimitedRetry2 = new Retry(-10, initialWait, waitIncrement, maxRetries * 1000l);
   }
 
   @Test
@@ -124,4 +128,23 @@ public class RetryTest {
 
     EasyMock.verify(retry);
   }
+
+  @Test
+  public void testIsMaxRetryDisabled() {
+    Assert.assertFalse(retry.isMaxRetryDisabled());
+    Assert.assertTrue(unlimitedRetry1.isMaxRetryDisabled());
+    Assert.assertTrue(unlimitedRetry2.isMaxRetryDisabled());
+    Assert.assertEquals(Retry.MAX_RETRY_DISABLED, unlimitedRetry1.getMaxRetries());
+    Assert.assertEquals(-10, unlimitedRetry2.getMaxRetries());
+  }
+
+  @Test
+  public void testUnlimitedRetry() {
+    for (int i = 0; i < Integer.MAX_VALUE; i++) {
+      Assert.assertTrue(unlimitedRetry1.canRetry());
+      unlimitedRetry1.useRetry();
+      Assert.assertTrue(unlimitedRetry2.canRetry());
+      unlimitedRetry2.useRetry();
+    }
+  }
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 852c611..a277714 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -350,14 +350,16 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
           + minBlockSize + ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE + " or
decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml.");
 
     final long toleratedWalCreationFailures = aconf.getCount(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES);
-    final long walCreationFailureRetryIncrement = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT);
-    final long walCreationFailureRetryMax = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION);
-    // Tolerate `toleratedWalCreationFailures` failures, waiting `walCreationFailureRetryIncrement`
milliseconds after the first failure,
-    // incrementing the next wait period by the same value, for a maximum of `walCreationFailureRetryMax`
retries.
-    final RetryFactory walCreationRetryFactory = new RetryFactory(toleratedWalCreationFailures,
walCreationFailureRetryIncrement,
-        walCreationFailureRetryIncrement, walCreationFailureRetryMax);
-
-    logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter, walCreationRetryFactory,
walogMaxAge);
+    final long walFailureRetryIncrement = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT);
+    final long walFailureRetryMax = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION);
+    // Tolerate `toleratedWalCreationFailures` failures, waiting `walFailureRetryIncrement`
milliseconds after the first failure,
+    // incrementing the next wait period by the same value, for a maximum of `walFailureRetryMax`
retries.
+    final RetryFactory walCreationRetryFactory = new RetryFactory(toleratedWalCreationFailures,
walFailureRetryIncrement, walFailureRetryIncrement,
+        walFailureRetryMax);
+    // Tolerate infinite failures for the write, however backing off the same as for creation
failures.
+    final RetryFactory walWritingRetryFactory = new RetryFactory(walFailureRetryIncrement,
walFailureRetryIncrement, walFailureRetryMax);
+
+    logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter, walCreationRetryFactory,
walWritingRetryFactory, walogMaxAge);
     this.resourceManager = new TabletServerResourceManager(this, fs);
     this.security = AuditedSecurityOperation.getInstance(this);
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index ef7784a..1487199 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -36,7 +36,6 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
-import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.fate.zookeeper.Retry;
 import org.apache.accumulo.fate.zookeeper.RetryFactory;
 import org.apache.accumulo.server.conf.TableConfiguration;
@@ -91,8 +90,10 @@ public class TabletServerLogger {
 
   private long createTime = 0;
 
-  private final RetryFactory retryFactory;
-  private Retry retry = null;
+  private final RetryFactory createRetryFactory;
+  private Retry createRetry = null;
+
+  private final RetryFactory writeRetryFactory;
 
   static private abstract class TestCallWithWriteLock {
     abstract boolean test();
@@ -137,13 +138,15 @@ public class TabletServerLogger {
     }
   }
 
-  public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong syncCounter, AtomicLong
flushCounter, RetryFactory retryFactory, long maxAge) {
+  public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong syncCounter, AtomicLong
flushCounter, RetryFactory createRetryFactory,
+      RetryFactory writeRetryFactory, long maxAge) {
     this.tserver = tserver;
     this.maxSize = maxSize;
     this.syncCounter = syncCounter;
     this.flushCounter = flushCounter;
-    this.retryFactory = retryFactory;
-    this.retry = null;
+    this.createRetryFactory = createRetryFactory;
+    this.createRetry = null;
+    this.writeRetryFactory = writeRetryFactory;
     this.maxAge = maxAge;
   }
 
@@ -204,25 +207,25 @@ public class TabletServerLogger {
       logSetId.incrementAndGet();
 
       // When we successfully create a WAL, make sure to reset the Retry.
-      if (null != retry) {
-        retry = null;
+      if (null != createRetry) {
+        createRetry = null;
       }
 
       this.createTime = System.currentTimeMillis();
       return;
     } catch (Exception t) {
-      if (null == retry) {
-        retry = retryFactory.create();
+      if (null == createRetry) {
+        createRetry = createRetryFactory.create();
       }
 
       // We have more retries or we exceeded the maximum number of accepted failures
-      if (retry.canRetry()) {
-        // Use the retry and record the time in which we did so
-        retry.useRetry();
+      if (createRetry.canRetry()) {
+        // Use the createRetry and record the time in which we did so
+        createRetry.useRetry();
 
         try {
           // Backoff
-          retry.waitForNextAttempt();
+          createRetry.waitForNextAttempt();
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           throw new RuntimeException(e);
@@ -264,15 +267,22 @@ public class TabletServerLogger {
   }
 
   private void write(CommitSession commitSession, boolean mincFinish, Writer writer) throws
IOException {
+    write(commitSession, mincFinish, writer, writeRetryFactory.create());
+  }
+
+  private void write(CommitSession commitSession, boolean mincFinish, Writer writer, Retry
writeRetry) throws IOException {
     List<CommitSession> sessions = Collections.singletonList(commitSession);
-    write(sessions, mincFinish, writer);
+    write(sessions, mincFinish, writer, writeRetry);
   }
 
   private void write(final Collection<CommitSession> sessions, boolean mincFinish,
Writer writer) throws IOException {
+    write(sessions, mincFinish, writer, writeRetryFactory.create());
+  }
+
+  private void write(final Collection<CommitSession> sessions, boolean mincFinish,
Writer writer, Retry writeRetry) throws IOException {
     // Work very hard not to lock this during calls to the outside world
     int currentLogSet = logSetId.get();
 
-    int attempt = 1;
     boolean success = false;
     while (!success) {
       try {
@@ -288,7 +298,7 @@ public class TabletServerLogger {
             if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) {
               try {
                 // Scribble out a tablet definition and then write to the metadata table
-                defineTablet(commitSession);
+                defineTablet(commitSession, writeRetry);
                 if (currentLogSet == logSetId.get())
                   tserver.addLoggersToMetadata(copy, commitSession.getExtent(), commitSession.getLogId());
               } finally {
@@ -328,14 +338,19 @@ public class TabletServerLogger {
           success = (currentLogSet == logSetId.get());
         }
       } catch (DfsLogger.LogClosedException ex) {
-        log.debug("Logs closed while writing, retrying " + attempt);
+        log.debug("Logs closed while writing, retrying attempt " + writeRetry.retriesCompleted());
       } catch (Exception t) {
-        if (attempt != 1) {
-          log.error("Unexpected error writing to log, retrying attempt " + attempt, t);
+        log.warn("Failed to write to WAL, retrying attempt " + writeRetry.retriesCompleted(),
t);
+
+        try {
+          // Backoff
+          writeRetry.waitForNextAttempt();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
         }
-        UtilWaitThread.sleep(100);
       } finally {
-        attempt++;
+        writeRetry.useRetry();
       }
       // Some sort of write failure occurred. Grab the write lock and reset the logs.
       // But since multiple threads will attempt it, only attempt the reset when
@@ -377,7 +392,7 @@ public class TabletServerLogger {
     // TODO We can close the WAL here for replication purposes
   }
 
-  public void defineTablet(final CommitSession commitSession) throws IOException {
+  public void defineTablet(final CommitSession commitSession, final Retry writeRetry) throws
IOException {
     // scribble this into the metadata tablet, too.
     write(commitSession, false, new Writer() {
       @Override
@@ -385,7 +400,7 @@ public class TabletServerLogger {
         logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent());
         return DfsLogger.NO_WAIT_LOGGER_OP;
       }
-    });
+    }, writeRetry);
   }
 
   public void log(final CommitSession commitSession, final long tabletSeq, final Mutation
m, final Durability durability) throws IOException {

-- 
To stop receiving notification emails like this one, please contact
ibella@apache.org.

Mime
View raw message