accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch 1.9 updated: fixes #537 only remove WALs if older WALs can be removed (#540)
Date Fri, 29 Jun 2018 16:52:23 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new 05e49e1  fixes #537 only remove WALs if older WALs can be removed (#540)
05e49e1 is described below

commit 05e49e1578ffbfd961186ed836fce9bac5966436
Author: Keith Turner <keith@deenlo.com>
AuthorDate: Fri Jun 29 12:52:22 2018 -0400

    fixes #537 only remove WALs if older WALs can be removed (#540)
---
 .../org/apache/accumulo/tserver/TabletServer.java  |  85 +++++++++++++--
 .../accumulo/tserver/WalRemovalOrderTest.java      | 115 +++++++++++++++++++++
 2 files changed, 189 insertions(+), 11 deletions(-)

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index f87841d..bd41541 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -36,6 +36,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -265,6 +266,8 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class TabletServer extends AccumuloServerContext implements Runnable {
 
   private static final Logger log = LoggerFactory.getLogger(TabletServer.class);
@@ -3343,28 +3346,88 @@ public class TabletServer extends AccumuloServerContext implements
Runnable {
     }
   }
 
-  // remove any meta entries after a rolled log is no longer referenced
-  Set<DfsLogger> closedLogs = new HashSet<>();
+  // This is a set of WALs that are closed but may still be referenced by tablets. A LinkedHashSet
+  // is used because its very import to know the order in which WALs were closed when deciding
if a
+  // WAL is eligible for removal. Maintaining the order that logs were used in is currently
a simple
+  // task because there is only one active log at a time.
+  LinkedHashSet<DfsLogger> closedLogs = new LinkedHashSet<>();
+
+  @VisibleForTesting
+  interface ReferencedRemover {
+    void removeInUse(Set<DfsLogger> candidates);
+  }
+
+  /**
+   * For a closed WAL to be eligible for removal it must be unreferenced AND all closed WALs
older
+   * than it must be unreferenced. This method finds WALs that meet those conditions. See
Github
+   * issue #537.
+   */
+  @VisibleForTesting
+  static Set<DfsLogger> findOldestUnreferencedWals(List<DfsLogger> closedLogs,
+      ReferencedRemover referencedRemover) {
+    LinkedHashSet<DfsLogger> unreferenced = new LinkedHashSet<>(closedLogs);
+
+    referencedRemover.removeInUse(unreferenced);
+
+    Iterator<DfsLogger> closedIter = closedLogs.iterator();
+    Iterator<DfsLogger> unrefIter = unreferenced.iterator();
+
+    Set<DfsLogger> eligible = new HashSet<>();
+
+    while (closedIter.hasNext() && unrefIter.hasNext()) {
+      DfsLogger closed = closedIter.next();
+      DfsLogger unref = unrefIter.next();
+
+      if (closed.equals(unref)) {
+        eligible.add(unref);
+      } else {
+        break;
+      }
+    }
+
+    return eligible;
+  }
+
+  @VisibleForTesting
+  static List<DfsLogger> copyClosedLogs(LinkedHashSet<DfsLogger> closedLogs)
{
+    List<DfsLogger> closedCopy = new ArrayList<>(closedLogs.size());
+    for (DfsLogger dfsLogger : closedLogs) {
+      // very important this copy maintains same order ..
+      closedCopy.add(dfsLogger);
+    }
+    return closedCopy;
+  }
 
   private void markUnusedWALs() {
-    Set<DfsLogger> candidates;
+
+    List<DfsLogger> closedCopy;
+
     synchronized (closedLogs) {
-      candidates = new HashSet<>(closedLogs);
+      closedCopy = copyClosedLogs(closedLogs);
     }
-    for (Tablet tablet : getOnlineTablets()) {
-      tablet.removeInUseLogs(candidates);
-      if (candidates.isEmpty()) {
-        break;
+
+    ReferencedRemover refRemover = new ReferencedRemover() {
+      @Override
+      public void removeInUse(Set<DfsLogger> candidates) {
+        for (Tablet tablet : getOnlineTablets()) {
+          tablet.removeInUseLogs(candidates);
+          if (candidates.isEmpty()) {
+            break;
+          }
+        }
       }
-    }
+    };
+
+    Set<DfsLogger> eligible = findOldestUnreferencedWals(closedCopy, refRemover);
+
     try {
       TServerInstance session = this.getTabletSession();
-      for (DfsLogger candidate : candidates) {
+      for (DfsLogger candidate : eligible) {
         log.info("Marking " + candidate.getPath() + " as unreferenced");
         walMarker.walUnreferenced(session, candidate.getPath());
       }
       synchronized (closedLogs) {
-        closedLogs.removeAll(candidates);
+        closedLogs.removeAll(eligible);
       }
     } catch (WalMarkerException ex) {
       log.info(ex.toString(), ex);
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/WalRemovalOrderTest.java
b/server/tserver/src/test/java/org/apache/accumulo/tserver/WalRemovalOrderTest.java
new file mode 100644
index 0000000..e0ae6c1
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/WalRemovalOrderTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.tserver.TabletServer.ReferencedRemover;
+import org.apache.accumulo.tserver.log.DfsLogger;
+import org.apache.accumulo.tserver.log.DfsLogger.ServerResources;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class WalRemovalOrderTest {
+
+  private static DfsLogger mockLogger(String filename) {
+    ServerResources conf = new ServerResources() {
+      @Override
+      public AccumuloConfiguration getConfiguration() {
+        return DefaultConfiguration.getInstance();
+      }
+
+      @Override
+      public VolumeManager getFileSystem() {
+        throw new UnsupportedOperationException();
+      }
+    };
+
+    try {
+      return new DfsLogger(conf, filename, null);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  private static LinkedHashSet<DfsLogger> mockLoggers(String... logs) {
+    LinkedHashSet<DfsLogger> logSet = new LinkedHashSet<>();
+
+    for (String log : logs) {
+      logSet.add(mockLogger(log));
+    }
+
+    return logSet;
+  }
+
+  private static class TestRefRemover implements ReferencedRemover {
+    Set<DfsLogger> inUseLogs;
+
+    TestRefRemover(Set<DfsLogger> inUseLogs) {
+      this.inUseLogs = inUseLogs;
+    }
+
+    @Override
+    public void removeInUse(Set<DfsLogger> candidates) {
+      candidates.removeAll(inUseLogs);
+    }
+  }
+
+  private static void runTest(LinkedHashSet<DfsLogger> closedLogs, Set<DfsLogger>
inUseLogs,
+      Set<DfsLogger> expected) {
+    List<DfsLogger> copy = TabletServer.copyClosedLogs(closedLogs);
+    Set<DfsLogger> eligible = TabletServer.findOldestUnreferencedWals(copy,
+        new TestRefRemover(inUseLogs));
+    Assert.assertEquals(expected, eligible);
+  }
+
+  @Test
+  public void testWalRemoval() {
+    runTest(mockLoggers("W1", "W2"), mockLoggers(), mockLoggers("W1", "W2"));
+    runTest(mockLoggers("W1", "W2"), mockLoggers("W1"), mockLoggers());
+    runTest(mockLoggers("W1", "W2"), mockLoggers("W2"), mockLoggers("W1"));
+    runTest(mockLoggers("W1", "W2"), mockLoggers("W1", "W2"), mockLoggers());
+
+    // below W5 represents an open log not in the closed set
+    for (Set<DfsLogger> inUse : Sets.powerSet(mockLoggers("W1", "W2", "W3", "W4", "W5")))
{
+      Set<DfsLogger> expected;
+      if (inUse.contains(mockLogger("W1"))) {
+        expected = Collections.emptySet();
+      } else if (inUse.contains(mockLogger("W2"))) {
+        expected = mockLoggers("W1");
+      } else if (inUse.contains(mockLogger("W3"))) {
+        expected = mockLoggers("W1", "W2");
+      } else if (inUse.contains(mockLogger("W4"))) {
+        expected = mockLoggers("W1", "W2", "W3");
+      } else {
+        expected = mockLoggers("W1", "W2", "W3", "W4");
+      }
+
+      runTest(mockLoggers("W1", "W2", "W3", "W4"), inUse, expected);
+    }
+  }
+}


Mime
View raw message