accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ibe...@apache.org
Subject [accumulo] 01/01: Merge branch '1.7' into 1.8
Date Thu, 25 Jan 2018 22:30:42 GMT
This is an automated email from the ASF dual-hosted git repository.

ibella pushed a commit to branch 1.8
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit d46dd1658d2d89c3f495be19efd76a86eed13417
Merge: a13a6ad 79e127f
Author: Ivan Bella <ivan@bella.name>
AuthorDate: Thu Jan 25 16:29:35 2018 -0500

    Merge branch '1.7' into 1.8
    
    Conflicts:
    	server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java

 .../org/apache/accumulo/core/conf/Property.java    |   8 +-
 .../org/apache/accumulo/fate/zookeeper/Retry.java  |  22 ++++-
 .../accumulo/fate/zookeeper/RetryFactory.java      |  29 ++++++
 .../accumulo/fate/zookeeper/RetryFactoryTest.java  |  12 +++
 .../apache/accumulo/fate/zookeeper/RetryTest.java  |  23 +++++
 .../org/apache/accumulo/tserver/TabletServer.java  |  18 ++--
 .../accumulo/tserver/log/TabletServerLogger.java   | 109 +++++++++++----------
 7 files changed, 155 insertions(+), 66 deletions(-)

diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index c1fd6bb,1487199..da3ef0b
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@@ -78,25 -70,21 +76,23 @@@ public class TabletServerLogger 
  
    private final TabletServer tserver;
  
 -  // The current log set: always updated to a new set with every change of loggers
 -  private final List<DfsLogger> loggers = new ArrayList<>();
 +  // The current logger
 +  private DfsLogger currentLog = null;
 +  private final SynchronousQueue<Object> nextLog = new SynchronousQueue<>();
 +  private ThreadPoolExecutor nextLogMaker;
  
 -  // The current generation of logSet.
 -  // Because multiple threads can be using a log set at one time, a log
 +  // The current generation of logs.
 +  // Because multiple threads can be using a log at one time, a log
    // failure is likely to affect multiple threads, who will all attempt to
 -  // create a new logSet. This will cause many unnecessary updates to the
 +  // create a new log. This will cause many unnecessary updates to the
    // metadata table.
    // We'll use this generational counter to determine if another thread has
 -  // already fetched a new logSet.
 -  private AtomicInteger logSetId = new AtomicInteger();
 +  // already fetched a new log.
 +  private final AtomicInteger logId = new AtomicInteger();
  
    // Use a ReadWriteLock to allow multiple threads to use the log set, but obtain a write
lock to change them
 -  private final ReentrantReadWriteLock logSetLock = new ReentrantReadWriteLock();
 +  private final ReentrantReadWriteLock logIdLock = new ReentrantReadWriteLock();
  
-   private final AtomicInteger seqGen = new AtomicInteger();
- 
    private final AtomicLong syncCounter;
    private final AtomicLong flushCounter;
  
@@@ -213,29 -201,21 +213,29 @@@
      }
  
      try {
 -      DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
 -      alog.open(tserver.getClientAddressString());
 -      loggers.add(alog);
 -      logSetId.incrementAndGet();
 -
 -      // When we successfully create a WAL, make sure to reset the Retry.
 -      if (null != createRetry) {
 -        createRetry = null;
 +      startLogMaker();
 +      Object next = nextLog.take();
 +      if (next instanceof Exception) {
 +        throw (Exception) next;
        }
 +      if (next instanceof DfsLogger) {
 +        currentLog = (DfsLogger) next;
 +        logId.incrementAndGet();
 +        log.info("Using next log " + currentLog.getFileName());
 +
 +        // When we successfully create a WAL, make sure to reset the Retry.
-         if (null != retry) {
-           retry = null;
++        if (null != createRetry) {
++          createRetry = null;
 +        }
  
 -      this.createTime = System.currentTimeMillis();
 -      return;
 +        this.createTime = System.currentTimeMillis();
 +        return;
 +      } else {
 +        throw new RuntimeException("Error: unexpected type seen: " + next);
 +      }
      } catch (Exception t) {
-       if (null == retry) {
-         retry = retryFactory.create();
+       if (null == createRetry) {
+         createRetry = createRetryFactory.create();
        }
  
        // We have more retries or we exceeded the maximum number of accepted failures
@@@ -348,20 -263,26 +348,26 @@@
    }
  
    interface Writer {
-     LoggerOperation write(DfsLogger logger, int seq) throws Exception;
+     LoggerOperation write(DfsLogger logger) throws Exception;
    }
  
-   private int write(CommitSession commitSession, boolean mincFinish, Writer writer) throws
IOException {
+   private void write(CommitSession commitSession, boolean mincFinish, Writer writer) throws
IOException {
+     write(commitSession, mincFinish, writer, writeRetryFactory.create());
+   }
+ 
+   private void write(CommitSession commitSession, boolean mincFinish, Writer writer, Retry
writeRetry) throws IOException {
      List<CommitSession> sessions = Collections.singletonList(commitSession);
-     return write(sessions, mincFinish, writer);
+     write(sessions, mincFinish, writer, writeRetry);
    }
  
-   private int write(final Collection<CommitSession> sessions, boolean mincFinish,
Writer writer) throws IOException {
+   private void write(final Collection<CommitSession> sessions, boolean mincFinish,
Writer writer) throws IOException {
+     write(sessions, mincFinish, writer, writeRetryFactory.create());
+   }
+ 
+   private void write(final Collection<CommitSession> sessions, boolean mincFinish,
Writer writer, Retry writeRetry) throws IOException {
      // Work very hard not to lock this during calls to the outside world
 -    int currentLogSet = logSetId.get();
 +    int currentLogId = logId.get();
  
-     int seq = -1;
-     int attempt = 1;
      boolean success = false;
      while (!success) {
        try {
@@@ -379,7 -298,9 +385,7 @@@
              if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) {
                try {
                  // Scribble out a tablet definition and then write to the metadata table
-                 defineTablet(commitSession);
+                 defineTablet(commitSession, writeRetry);
 -                if (currentLogSet == logSetId.get())
 -                  tserver.addLoggersToMetadata(copy, commitSession.getExtent(), commitSession.getLogId());
                } finally {
                  commitSession.finishUpdatingLogsUsed();
                }
@@@ -397,27 -322,35 +403,29 @@@
          }
  
          // Make sure that the logs haven't changed out from underneath our copy
 -        if (currentLogSet == logSetId.get()) {
 +        if (currentLogId == logId.get()) {
  
            // write the mutation to the logs
-           seq = seqGen.incrementAndGet();
-           if (seq < 0)
-             throw new RuntimeException("Logger sequence generator wrapped!  Onos!!!11!eleven");
-           LoggerOperation lop = writer.write(copy, seq);
 -          ArrayList<LoggerOperation> queuedOperations = new ArrayList<>(copy.size());
 -          for (DfsLogger wal : copy) {
 -            queuedOperations.add(writer.write(wal));
 -          }
 -
 -          for (LoggerOperation lop : queuedOperations) {
 -            lop.await();
 -          }
++          LoggerOperation lop = writer.write(copy);
 +          lop.await();
  
            // double-check: did the log set change?
 -          success = (currentLogSet == logSetId.get());
 +          success = (currentLogId == logId.get());
          }
        } catch (DfsLogger.LogClosedException ex) {
-         log.debug("Logs closed while writing, retrying " + attempt);
+         log.debug("Logs closed while writing, retrying attempt " + writeRetry.retriesCompleted());
        } catch (Exception t) {
-         if (attempt != 1) {
-           log.error("Unexpected error writing to log, retrying attempt " + attempt, t);
+         log.warn("Failed to write to WAL, retrying attempt " + writeRetry.retriesCompleted(),
t);
+ 
+         try {
+           // Backoff
+           writeRetry.waitForNextAttempt();
+         } catch (InterruptedException e) {
+           Thread.currentThread().interrupt();
+           throw new RuntimeException(e);
          }
-         sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
        } finally {
-         attempt++;
+         writeRetry.useRetry();
        }
        // Some sort of write failure occurred. Grab the write lock and reset the logs.
        // But since multiple threads will attempt it, only attempt the reset when
@@@ -527,10 -457,11 +532,10 @@@
  
      long t1 = System.currentTimeMillis();
  
-     int seq = write(commitSession, true, new Writer() {
+     write(commitSession, true, new Writer() {
        @Override
-       public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
+       public LoggerOperation write(DfsLogger logger) throws Exception {
 -        logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName,
durability).await();
 -        return DfsLogger.NO_WAIT_LOGGER_OP;
 +        return logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName,
durability);
        }
      });
  
@@@ -543,8 -474,9 +548,8 @@@
        throws IOException {
      write(commitSession, false, new Writer() {
        @Override
-       public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
+       public LoggerOperation write(DfsLogger logger) throws Exception {
 -        logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName,
durability).await();
 -        return DfsLogger.NO_WAIT_LOGGER_OP;
 +        return logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName,
durability);
        }
      });
      return seq;

-- 
To stop receiving notification emails like this one, please contact
ibella@apache.org.

Mime
View raw message