accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch 1.9 updated: Fixes #854 handle many tablets referencing many WALs (#860)
Date Wed, 02 Jan 2019 19:16:05 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new f3c8644  Fixes #854 handle many tablets referencing many WALs (#860)
f3c8644 is described below

commit f3c86441a525686c4ceca926d593db0f0eb66af9
Author: Keith Turner <kturner@apache.org>
AuthorDate: Wed Jan 2 14:16:00 2019 -0500

    Fixes #854 handle many tablets referencing many WALs (#860)
---
 .../core/client/impl/TableOperationsImpl.java      |   4 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  22 +--
 .../org/apache/accumulo/tserver/tablet/Tablet.java |  36 +++-
 .../test/functional/ManyWriteAheadLogsIT.java      | 181 +++++++++++++++++++++
 .../accumulo/test/functional/WALSunnyDayIT.java    |  64 +++++---
 5 files changed, 264 insertions(+), 43 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
index 40e889a..93afdee 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
@@ -1032,8 +1032,8 @@ public class TableOperationsImpl extends TableOperationsHelper {
             "Group " + entry.getKey() + " overlaps with another group");
       }
 
-      if(entry.getValue().isEmpty()) {
-        throw new IllegalArgumentException("Group "+entry.getKey()+" is empty");
+      if (entry.getValue().isEmpty()) {
+        throw new IllegalArgumentException("Group " + entry.getKey() + " is empty");
       }
 
       all.addAll(entry.getValue());
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 7107843..dc4d349 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -2034,6 +2034,12 @@ public class TabletServer extends AccumuloServerContext implements
Runnable {
             // modification
           }
 
+          List<DfsLogger> closedCopy;
+
+          synchronized (closedLogs) {
+            closedCopy = copyClosedLogs(closedLogs);
+          }
+
           int numMajorCompactionsInProgress = 0;
 
           Iterator<Entry<KeyExtent,Tablet>> iter = copyOnlineTablets.entrySet().iterator();
@@ -2052,14 +2058,7 @@ public class TabletServer extends AccumuloServerContext implements
Runnable {
               continue;
             }
 
-            int maxLogEntriesPerTablet = getTableConfiguration(tablet.getExtent())
-                .getCount(Property.TABLE_MINC_LOGS_MAX);
-
-            if (tablet.getLogCount() >= maxLogEntriesPerTablet) {
-              log.debug("Initiating minor compaction for " + tablet.getExtent() + " because
it has "
-                  + tablet.getLogCount() + " write ahead logs");
-              tablet.initiateMinorCompaction(MinorCompactionReason.SYSTEM);
-            }
+            tablet.checkIfMinorCompactionNeededForLogs(closedCopy);
 
             synchronized (tablet) {
               if (tablet.initiateMajorCompaction(MajorCompactionReason.NORMAL)
@@ -3408,7 +3407,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
       // very important this copy maintains same order ..
       closedCopy.add(dfsLogger);
     }
-    return closedCopy;
+    return Collections.unmodifiableList(closedCopy);
   }
 
   private void markUnusedWALs() {
@@ -3454,11 +3453,14 @@ public class TabletServer extends AccumuloServerContext implements
Runnable {
 
   public void walogClosed(DfsLogger currentLog) throws WalMarkerException {
     metadataTableLogs.remove(currentLog);
+
     if (currentLog.getWrites() > 0) {
+      int clSize;
       synchronized (closedLogs) {
         closedLogs.add(currentLog);
+        clSize = closedLogs.size();
       }
-      log.info("Marking " + currentLog.getPath() + " as closed");
+      log.info("Marking " + currentLog.getPath() + " as closed. Total closed logs " + clSize);
       walMarker.closeWal(getTabletSession(), currentLog.getPath());
     } else {
       log.info(
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 9554b93..98bf80a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -2497,6 +2497,38 @@ public class Tablet implements TabletCommitter {
     candidates.removeAll(referencedLogs);
   }
 
+  public void checkIfMinorCompactionNeededForLogs(List<DfsLogger> closedLogs) {
+
+    // grab this outside of tablet lock.
+    int maxLogs = tableConfiguration.getCount(Property.TABLE_MINC_LOGS_MAX);
+
+    String reason = null;
+    synchronized (this) {
+      if (currentLogs.size() >= maxLogs) {
+        reason = "referenced " + currentLogs.size() + " write ahead logs";
+      } else if (maxLogs < closedLogs.size()) {
+        // If many tablets reference a single WAL, but each tablet references a different
WAL then
+        // this could result in the tablet server referencing many WALs. For recovery that
would
+        // mean each tablet had to process lots of WAL. This check looks for a single use
of an
+        // older WAL and compacts if one is found. The following check assumes the most recent
WALs
+        // are at the end of the list and ignores these.
+        List<DfsLogger> oldClosed = closedLogs.subList(0, closedLogs.size() - maxLogs);
+        for (DfsLogger closedLog : oldClosed) {
+          if (currentLogs.contains(closedLog)) {
+            reason = "referenced at least one old write ahead log " + closedLog.getFileName();
+            break;
+          }
+        }
+      }
+    }
+
+    if (reason != null) {
+      // initiate and log outside of tablet lock
+      initiateMinorCompaction(MinorCompactionReason.SYSTEM);
+      log.debug("Initiating minor compaction for " + getExtent() + " because " + reason);
+    }
+  }
+
   Set<String> beginClearingUnusedLogs() {
     Set<String> unusedLogs = new HashSet<>();
 
@@ -2557,10 +2589,6 @@ public class Tablet implements TabletCommitter {
   // this lock is basically used to synchronize writing of log info to metadata
   private final ReentrantLock logLock = new ReentrantLock();
 
-  public synchronized int getLogCount() {
-    return currentLogs.size();
-  }
-
   // don't release the lock if this method returns true for success; instead, the caller
should
   // clean up by calling finishUpdatingLogsUsed()
   @Override
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ManyWriteAheadLogsIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ManyWriteAheadLogsIT.java
new file mode 100644
index 0000000..e5b9065
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ManyWriteAheadLogsIT.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.util.Base64;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ManyWriteAheadLogsIT extends AccumuloClusterHarness {
+
+  private static final Logger log = LoggerFactory.getLogger(ManyWriteAheadLogsIT.class);
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite)
{
+    // configure a smaller walog size so the walogs will roll frequently in the test
+    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
+    cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
+    cfg.setProperty(Property.GC_CYCLE_START, "1");
+    cfg.setProperty(Property.MASTER_RECOVERY_DELAY, "1s");
+    cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+    // idle compactions may addess the problem this test is creating, however they will not
prevent
+    // lots of closed WALs for all write patterns. This test ensures code that directly handles
many
+    // tablets referencing many different WALs is working.
+    cfg.setProperty(Property.TABLE_MINC_COMPACT_IDLETIME, "1h");
+    cfg.setNumTservers(1);
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  @Override
+  protected int defaultTimeoutSeconds() {
+    return 10 * 60;
+  }
+
+  /**
+   * This creates a situation where many tablets reference many different write ahead logs.
However
+   * not single tablet references a lot of write ahead logs. Want to ensure the tablet server
forces
+   * minor compactions for this situation.
+   */
+  @Test
+  public void testMany() throws Exception {
+    SortedSet<Text> splits = new TreeSet<>();
+    for (int i = 1; i < 100; i++) {
+      splits.add(new Text(String.format("%05x", i * 100)));
+    }
+
+    Connector c = getConnector();
+    String[] tableNames = getUniqueNames(2);
+
+    String manyWALsTable = tableNames[0];
+    String rollWALsTable = tableNames[1];
+    c.tableOperations().create(manyWALsTable);
+    c.tableOperations().addSplits(manyWALsTable, splits);
+
+    c.tableOperations().create(rollWALsTable);
+
+    Random rand = new Random();
+
+    Set<String> allWalsSeen = new HashSet<>();
+
+    addOpenWals(c, allWalsSeen);
+
+    // This test creates the table manyWALsTable with a lot of tablets and writes a little
bit to
+    // each tablet. In between writing a little bit to each tablet a lot of data is written
to
+    // another table called rollWALsTable. Writing a lot causes the write ahead logs to roll.
This
+    // write pattern should cause the tablets in the manyWALsTable table to reference many
closed
+    // WALs. If nothing is done about all of these closed WALs, then it could cause a large
burden
+    // at recovery time.
+
+    try (BatchWriter manyWALsWriter = c.createBatchWriter(manyWALsTable, new BatchWriterConfig());
+        BatchWriter rollWALsWriter = c.createBatchWriter(rollWALsTable, new BatchWriterConfig()))
{
+
+      byte[] val = new byte[768];
+
+      for (int i = 0; i < 100; i++) {
+        int startRow = i * 100;
+
+        // write a small amount of data to each tablet in the table
+        for (int j = 0; j < 10; j++) {
+          int row = startRow + j;
+          Mutation m = new Mutation(String.format("%05x", row));
+          rand.nextBytes(val);
+          m.put("f", "q", "v");
+
+          manyWALsWriter.addMutation(m);
+        }
+        manyWALsWriter.flush();
+
+        // write a lot of data to second table to forces the logs to roll
+        for (int j = 0; j < 1000; j++) {
+          Mutation m = new Mutation(String.format("%03d", j));
+          rand.nextBytes(val);
+
+          m.put("f", "q", Base64.encodeBase64String(val));
+
+          rollWALsWriter.addMutation(m);
+        }
+
+        rollWALsWriter.flush();
+
+        // keep track of the open WALs as the test runs. Should see a lot of open WALs over
the
+        // lifetime of the test, but never a lot at any one time.
+        addOpenWals(c, allWalsSeen);
+      }
+    }
+
+    assertTrue("Number of WALs seen was less than expected " + allWalsSeen.size(),
+        allWalsSeen.size() >= 50);
+
+    // the total number of closed write ahead logs should get small
+    int closedLogs = countClosedWals(c);
+    while (closedLogs > 3) {
+      log.debug("Waiting for wals to shrink " + closedLogs);
+      Thread.sleep(250);
+      closedLogs = countClosedWals(c);
+    }
+  }
+
+  private void addOpenWals(Connector c, Set<String> allWalsSeen) throws Exception {
+    Map<String,WalState> wals = WALSunnyDayIT._getWals(c);
+    Set<Entry<String,WalState>> es = wals.entrySet();
+    int open = 0;
+    for (Entry<String,WalState> entry : es) {
+      if (entry.getValue() == WalState.OPEN) {
+        open++;
+        allWalsSeen.add(entry.getKey());
+      }
+    }
+
+    assertTrue("Open WALs not in expected range " + open, open > 0 && open <
4);
+  }
+
+  private int countClosedWals(Connector c) throws Exception {
+    int count = 0;
+    Map<String,WalState> wals = WALSunnyDayIT._getWals(c);
+    for (WalState ws : wals.values()) {
+      if (ws == WalState.CLOSED) {
+        count++;
+      }
+    }
+
+    return count;
+  }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java b/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
index b2ace36..702bce9 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@ -56,12 +56,14 @@ import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterControl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
 import org.apache.accumulo.server.log.WalStateManager.WalState;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.junit.Test;
 
 import com.google.common.collect.Iterators;
@@ -81,10 +83,10 @@ public class WALSunnyDayIT extends ConfigurableMacBase {
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
-  int countTrue(Collection<Boolean> bools) {
+  int countInUse(Collection<WalState> bools) {
     int result = 0;
-    for (Boolean b : bools) {
-      if (b.booleanValue())
+    for (WalState b : bools) {
+      if (b != WalState.UNREFERENCED)
         result++;
     }
     return result;
@@ -101,17 +103,15 @@ public class WALSunnyDayIT extends ConfigurableMacBase {
     writeSomeData(c, tableName, 1, 1);
 
     // wal markers are added lazily
-    Map<String,Boolean> wals = getWALsAndAssertCount(c, 2);
-    for (Boolean b : wals.values()) {
-      assertTrue("logs should be in use", b.booleanValue());
-    }
+    Map<String,WalState> wals = getWALsAndAssertCount(c, 2);
+    assertEquals("all WALs should be in use", 2, countInUse(wals.values()));
 
     // roll log, get a new next
     writeSomeData(c, tableName, 1001, 50);
-    Map<String,Boolean> walsAfterRoll = getWALsAndAssertCount(c, 3);
+    Map<String,WalState> walsAfterRoll = getWALsAndAssertCount(c, 3);
     assertTrue("new WALs should be a superset of the old WALs",
         walsAfterRoll.keySet().containsAll(wals.keySet()));
-    assertEquals("all WALs should be in use", 3, countTrue(walsAfterRoll.values()));
+    assertEquals("all WALs should be in use", 3, countInUse(walsAfterRoll.values()));
 
     // flush the tables
     for (String table : new String[] {tableName, MetadataTable.NAME, RootTable.NAME}) {
@@ -119,8 +119,8 @@ public class WALSunnyDayIT extends ConfigurableMacBase {
     }
     sleepUninterruptibly(1, TimeUnit.SECONDS);
     // rolled WAL is no longer in use, but needs to be GC'd
-    Map<String,Boolean> walsAfterflush = getWALsAndAssertCount(c, 3);
-    assertEquals("inUse should be 2", 2, countTrue(walsAfterflush.values()));
+    Map<String,WalState> walsAfterflush = getWALsAndAssertCount(c, 3);
+    assertEquals("inUse should be 2", 2, countInUse(walsAfterflush.values()));
 
     // let the GC run for a little bit
     control.start(GARBAGE_COLLECTOR);
@@ -149,13 +149,13 @@ public class WALSunnyDayIT extends ConfigurableMacBase {
     verifySomeData(c, tableName, 1001 * 50 + 1);
     writeSomeData(c, tableName, 100, 100);
 
-    Map<String,Boolean> walsAfterRestart = getWALsAndAssertCount(c, 4);
+    Map<String,WalState> walsAfterRestart = getWALsAndAssertCount(c, 4);
     // log.debug("wals after " + walsAfterRestart);
-    assertEquals("used WALs after restart should be 4", 4, countTrue(walsAfterRestart.values()));
+    assertEquals("used WALs after restart should be 4", 4, countInUse(walsAfterRestart.values()));
     control.start(GARBAGE_COLLECTOR);
     sleepUninterruptibly(5, TimeUnit.SECONDS);
-    Map<String,Boolean> walsAfterRestartAndGC = getWALsAndAssertCount(c, 2);
-    assertEquals("logs in use should be 2", 2, countTrue(walsAfterRestartAndGC.values()));
+    Map<String,WalState> walsAfterRestartAndGC = getWALsAndAssertCount(c, 2);
+    assertEquals("logs in use should be 2", 2, countInUse(walsAfterRestartAndGC.values()));
   }
 
   private void verifySomeData(Connector c, String tableName, int expected) throws Exception
{
@@ -220,14 +220,14 @@ public class WALSunnyDayIT extends ConfigurableMacBase {
   private final int TIMES_TO_COUNT = 20;
   private final int PAUSE_BETWEEN_COUNTS = 100;
 
-  private Map<String,Boolean> getWALsAndAssertCount(Connector c, int expectedCount)
+  private Map<String,WalState> getWALsAndAssertCount(Connector c, int expectedCount)
       throws Exception {
     // see https://issues.apache.org/jira/browse/ACCUMULO-4110. Sometimes this test counts
the logs
     // before
     // the new standby log is actually ready. So let's try a few times before failing, returning
the
     // last
     // wals variable with the the correct count.
-    Map<String,Boolean> wals = _getWals(c);
+    Map<String,WalState> wals = _getWals(c);
     if (wals.size() == expectedCount) {
       return wals;
     }
@@ -258,17 +258,27 @@ public class WALSunnyDayIT extends ConfigurableMacBase {
     return waitLonger;
   }
 
-  private Map<String,Boolean> _getWals(Connector c) throws Exception {
-    Map<String,Boolean> result = new HashMap<>();
-    Instance i = c.getInstance();
-    ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(),
-        "");
-    WalStateManager wals = new WalStateManager(c.getInstance(), zk);
-    for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
-      // WALs are in use if they are not unreferenced
-      result.put(entry.getKey().toString(), entry.getValue() != WalState.UNREFERENCED);
+  static Map<String,WalState> _getWals(Connector c) throws Exception {
+    while (true) {
+      try {
+        Map<String,WalState> result = new HashMap<>();
+        Instance i = c.getInstance();
+        ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(),
+            "");
+        WalStateManager wals = new WalStateManager(c.getInstance(), zk);
+        for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
+          // WALs are in use if they are not unreferenced
+          result.put(entry.getKey().toString(), entry.getValue());
+        }
+        return result;
+      } catch (WalMarkerException wme) {
+        if (wme.getCause() instanceof NoNodeException) {
+          log.debug("WALs changed while reading, retrying", wme);
+        } else {
+          throw wme;
+        }
+      }
     }
-    return result;
   }
 
 }


Mime
View raw message