accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch 1.9 updated: fixes #823 unreference closed WALs that were never written to (#845)
Date Wed, 02 Jan 2019 19:01:34 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new 31f16ac  fixes #823 unreference closed WALs that were never written to (#845)
31f16ac is described below

commit 31f16ac7b7eda0766dd2acc25da2c1d10fe8e965
Author: Keith Turner <kturner@apache.org>
AuthorDate: Wed Jan 2 14:01:30 2019 -0500

    fixes #823 unreference closed WALs that were never written to (#845)
---
 .../org/apache/accumulo/tserver/TabletServer.java    | 14 ++++++++++----
 .../org/apache/accumulo/tserver/log/DfsLogger.java   | 20 +++++++++++++++++---
 .../accumulo/tserver/log/TabletServerLogger.java     | 16 ----------------
 3 files changed, 27 insertions(+), 23 deletions(-)

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index b1d4947..7107843 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3454,11 +3454,17 @@ public class TabletServer extends AccumuloServerContext implements
Runnable {
 
   public void walogClosed(DfsLogger currentLog) throws WalMarkerException {
     metadataTableLogs.remove(currentLog);
-    synchronized (closedLogs) {
-      closedLogs.add(currentLog);
+    if (currentLog.getWrites() > 0) {
+      synchronized (closedLogs) {
+        closedLogs.add(currentLog);
+      }
+      log.info("Marking " + currentLog.getPath() + " as closed");
+      walMarker.closeWal(getTabletSession(), currentLog.getPath());
+    } else {
+      log.info(
+          "Marking " + currentLog.getPath() + " as unreferenced (skipping closed writes ==
0)");
+      walMarker.walUnreferenced(getTabletSession(), currentLog.getPath());
     }
-    log.info("Marking " + currentLog.getPath() + " as closed");
-    walMarker.closeWal(getTabletSession(), currentLog.getPath());
   }
 
   public void updateBulkImportState(List<String> files, BulkImportState state) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 263f0c8..8f32aae 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -69,6 +69,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 
 /**
  * Wrap a connection to a logger.
@@ -320,6 +321,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
   private AtomicLong syncCounter;
   private AtomicLong flushCounter;
   private final long slowFlushMillis;
+  private long writes = 0;
 
   private DfsLogger(ServerResources conf) {
     this.conf = conf;
@@ -596,6 +598,11 @@ public class DfsLogger implements Comparable<DfsLogger> {
       }
   }
 
+  public synchronized long getWrites() {
+    Preconditions.checkState(writes >= 0);
+    return writes;
+  }
+
   public synchronized void defineTablet(long seq, int tid, KeyExtent tablet) throws IOException
{
     // write this log to the METADATA table
     final LogFileKey key = new LogFileKey();
@@ -610,12 +617,18 @@ public class DfsLogger implements Comparable<DfsLogger> {
           + " incompatible with this version of Hadoop.");
       throw new RuntimeException(e);
     }
+
+    synchronized (closeLock) {
+      if (closed)
+        throw new LogClosedException();
+    }
   }
 
   private synchronized void write(LogFileKey key, LogFileValue value) throws IOException
{
     key.write(encryptingLogFile);
     value.write(encryptingLogFile);
     encryptingLogFile.flush();
+    writes++;
   }
 
   public LoggerOperation log(long seq, int tid, Mutation mutation, Durability durability)
@@ -640,15 +653,16 @@ public class DfsLogger implements Comparable<DfsLogger> {
       }
     }
 
-    if (durability == Durability.LOG)
-      return NO_WAIT_LOGGER_OP;
-
     synchronized (closeLock) {
       // use a different lock for close check so that adding to work queue does not need
       // to wait on walog I/O operations
 
       if (closed)
         throw new LogClosedException();
+
+      if (durability == Durability.LOG)
+        return NO_WAIT_LOGGER_OP;
+
       workQueue.add(work);
     }
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 8e946c3..f46c12d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -319,15 +319,6 @@ public class TabletServerLogger {
     }));
   }
 
-  public void resetLoggers() throws IOException {
-    logIdLock.writeLock().lock();
-    try {
-      close();
-    } finally {
-      logIdLock.writeLock().unlock();
-    }
-  }
-
   synchronized private void close() throws IOException {
     if (!logIdLock.isWriteLockedByCurrentThread()) {
       throw new IllegalStateException("close should be called with write lock held!");
@@ -453,7 +444,6 @@ public class TabletServerLogger {
           @Override
           void withWriteLock() throws IOException {
             close();
-            closeForReplication(sessions);
           }
         });
       }
@@ -470,15 +460,10 @@ public class TabletServerLogger {
       @Override
       void withWriteLock() throws IOException {
         close();
-        closeForReplication(sessions);
       }
     });
   }
 
-  protected void closeForReplication(Collection<CommitSession> sessions) {
-    // TODO We can close the WAL here for replication purposes
-  }
-
   public void defineTablet(final CommitSession commitSession, final Retry writeRetry)
       throws IOException {
     // scribble this into the metadata tablet, too.
@@ -583,5 +568,4 @@ public class TabletServerLogger {
       throw new IOException(e);
     }
   }
-
 }


Mime
View raw message