accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmil...@apache.org
Subject [accumulo] branch master updated: Cleanup TabletServerLogger code (#793)
Date Wed, 12 Dec 2018 22:57:54 GMT
This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new eff92f1  Cleanup TabletServerLogger code (#793)
eff92f1 is described below

commit eff92f178b01786c4debcc9f927c8cd9a84103a7
Author: Mike Miller <mmiller@apache.org>
AuthorDate: Wed Dec 12 17:57:50 2018 -0500

    Cleanup TabletServerLogger code (#793)
    
    * Remove unnecessary object manipulation across method calls
    * Remove unused methods
    * Replace code with lambdas
    * Removed Mutations class that is no longer needed
    * Removed extra loops for checking durability by creating maxDurability method
    * Make Tserver not log mutation when durability is none
    * Simplify sendables object that calls commit on mutations
---
 .../org/apache/accumulo/tserver/Mutations.java     |  40 -------
 .../org/apache/accumulo/tserver/TabletServer.java  |  82 +++++++-------
 .../org/apache/accumulo/tserver/log/DfsLogger.java |  24 ++--
 .../accumulo/tserver/log/TabletServerLogger.java   | 122 +++++----------------
 .../apache/accumulo/tserver/log/DfsLoggerTest.java |  25 +++--
 5 files changed, 102 insertions(+), 191 deletions(-)

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java
deleted file mode 100644
index 76061e6..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.tserver;
-
-import java.util.List;
-
-import org.apache.accumulo.core.client.Durability;
-import org.apache.accumulo.core.data.Mutation;
-
-public class Mutations {
-  private final Durability durability;
-  private final List<Mutation> mutations;
-
-  Mutations(Durability durability, List<Mutation> mutations) {
-    this.durability = durability;
-    this.mutations = mutations;
-  }
-
-  public Durability getDurability() {
-    return durability;
-  }
-
-  public List<Mutation> getMutations() {
-    return mutations;
-  }
-}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 51962bd..97cdc9b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -1013,7 +1013,8 @@ public class TabletServer implements Runnable {
     private void flush(UpdateSession us) {
 
       int mutationCount = 0;
-      Map<CommitSession,Mutations> sendables = new HashMap<>();
+      Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
+      Map<CommitSession,TabletMutations> loggables = new HashMap<>();
       Throwable error = null;
 
       long pt1 = System.currentTimeMillis();
@@ -1031,7 +1032,8 @@ public class TabletServer implements Runnable {
         for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet())
{
 
           Tablet tablet = entry.getKey();
-          Durability tabletDurability = tablet.getDurability();
+          Durability durability = DurabilityImpl.resolveDurabilty(us.durability,
+              tablet.getDurability());
           List<Mutation> mutations = entry.getValue();
           if (mutations.size() > 0) {
             try {
@@ -1045,8 +1047,11 @@ public class TabletServer implements Runnable {
                 }
                 us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
               } else {
-                sendables.put(commitSession, new Mutations(
-                    DurabilityImpl.resolveDurabilty(us.durability, tabletDurability), mutations));
+                if (durability != Durability.NONE) {
+                  loggables.put(commitSession, new TabletMutations(commitSession.getLogId(),
+                      commitSession.getWALogSeq(), mutations, durability));
+                }
+                sendables.put(commitSession, mutations);
                 mutationCount += mutations.size();
               }
 
@@ -1059,9 +1064,12 @@ public class TabletServer implements Runnable {
                 // only log and commit mutations if there were some
                 // that did not violate constraints... this is what
                 // prepareMutationsForCommit() expects
-                sendables.put(e.getCommitSession(),
-                    new Mutations(DurabilityImpl.resolveDurabilty(us.durability, tabletDurability),
-                        e.getNonViolators()));
+                CommitSession cs = e.getCommitSession();
+                if (durability != Durability.NONE) {
+                  loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
+                      e.getNonViolators(), durability));
+                }
+                sendables.put(cs, e.getNonViolators());
               }
 
               mutationCount += mutations.size();
@@ -1082,9 +1090,7 @@ public class TabletServer implements Runnable {
       updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
 
       if (error != null) {
-        for (Entry<CommitSession,Mutations> e : sendables.entrySet()) {
-          e.getKey().abortCommit(e.getValue().getMutations());
-        }
+        sendables.forEach(CommitSession::abortCommit);
         throw new RuntimeException(error);
       }
       try {
@@ -1094,7 +1100,7 @@ public class TabletServer implements Runnable {
             try {
               long t1 = System.currentTimeMillis();
 
-              logger.logManyTablets(sendables);
+              logger.logManyTablets(loggables);
 
               long t2 = System.currentTimeMillis();
               us.walogTimes.addStat(t2 - t1);
@@ -1115,12 +1121,8 @@ public class TabletServer implements Runnable {
         Span commit = Trace.start("commit");
         try {
           long t1 = System.currentTimeMillis();
-          for (Entry<CommitSession,Mutations> entry : sendables.entrySet()) {
-            CommitSession commitSession = entry.getKey();
-            List<Mutation> mutations = entry.getValue().getMutations();
-
+          sendables.forEach((commitSession, mutations) -> {
             commitSession.commit(mutations);
-
             KeyExtent extent = commitSession.getExtent();
 
             if (us.currentTablet != null && extent == us.currentTablet.getExtent())
{
@@ -1131,7 +1133,7 @@ public class TabletServer implements Runnable {
               us.successfulCommits.increment(us.currentTablet,
                   us.queuedMutations.get(us.currentTablet).size());
             }
-          }
+          });
           long t2 = System.currentTimeMillis();
 
           us.flushTime += (t2 - pt1);
@@ -1266,12 +1268,14 @@ public class TabletServer implements Runnable {
           throw new NotServingTabletException(tkeyExtent);
         }
 
-        while (true) {
+        Durability durability = DurabilityImpl
+            .resolveDurabilty(DurabilityImpl.fromThrift(tdurability), tabletDurability);
+        // instead of always looping on true, skip completely when durability is NONE
+        while (durability != Durability.NONE) {
           try {
             final Span wal = Trace.start("wal");
             try {
-              logger.log(cs, cs.getWALogSeq(), mutation, DurabilityImpl
-                  .resolveDurabilty(DurabilityImpl.fromThrift(tdurability), tabletDurability));
+              logger.log(cs, cs.getWALogSeq(), mutation, durability);
             } finally {
               wal.stop();
             }
@@ -1356,7 +1360,8 @@ public class TabletServer implements Runnable {
         ArrayList<TCMResult> results, ConditionalSession sess) {
       Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
 
-      Map<CommitSession,Mutations> sendables = new HashMap<>();
+      Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
+      Map<CommitSession,TabletMutations> loggables = new HashMap<>();
 
       boolean sessionCanceled = sess.interruptFlag.get();
 
@@ -1369,7 +1374,8 @@ public class TabletServer implements Runnable {
             for (ServerConditionalMutation scm : entry.getValue())
               results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
           } else {
-            final Durability tabletDurability = tablet.getDurability();
+            final Durability durability = DurabilityImpl.resolveDurabilty(sess.durability,
+                tablet.getDurability());
             try {
 
               @SuppressWarnings("unchecked")
@@ -1386,18 +1392,21 @@ public class TabletServer implements Runnable {
                 } else {
                   for (ServerConditionalMutation scm : entry.getValue())
                     results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
-                  sendables.put(cs,
-                      new Mutations(
-                          DurabilityImpl.resolveDurabilty(sess.durability, tabletDurability),
-                          mutations));
+                  if (durability != Durability.NONE) {
+                    loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
+                        mutations, durability));
+                  }
+                  sendables.put(cs, mutations);
                 }
               }
             } catch (TConstraintViolationException e) {
+              CommitSession cs = e.getCommitSession();
               if (e.getNonViolators().size() > 0) {
-                sendables.put(e.getCommitSession(),
-                    new Mutations(
-                        DurabilityImpl.resolveDurabilty(sess.durability, tabletDurability),
-                        e.getNonViolators()));
+                if (durability != Durability.NONE) {
+                  loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
+                      e.getNonViolators(), durability));
+                }
+                sendables.put(cs, e.getNonViolators());
                 for (Mutation m : e.getNonViolators())
                   results.add(
                       new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
@@ -1418,10 +1427,10 @@ public class TabletServer implements Runnable {
 
       Span walSpan = Trace.start("wal");
       try {
-        while (sendables.size() > 0) {
+        while (loggables.size() > 0) {
           try {
             long t1 = System.currentTimeMillis();
-            logger.logManyTablets(sendables);
+            logger.logManyTablets(loggables);
             long t2 = System.currentTimeMillis();
             updateWalogWriteTime(t2 - t1);
             break;
@@ -1440,12 +1449,7 @@ public class TabletServer implements Runnable {
       Span commitSpan = Trace.start("commit");
       try {
         long t1 = System.currentTimeMillis();
-        for (Entry<CommitSession,Mutations> entry : sendables.entrySet()) {
-          CommitSession commitSession = entry.getKey();
-          List<Mutation> mutations = entry.getValue().getMutations();
-
-          commitSession.commit(mutations);
-        }
+        sendables.forEach(CommitSession::commit);
         long t2 = System.currentTimeMillis();
         updateAvgCommitTime(t2 - t1, sendables.size());
       } finally {
@@ -3389,7 +3393,7 @@ public class TabletServer implements Runnable {
             "Unable to find recovery files for extent " + extent + " logEntry: " + entry);
       recoveryLogs.add(recovery);
     }
-    logger.recover(fs, extent, tconf, recoveryLogs, tabletFiles, mutationReceiver);
+    logger.recover(fs, extent, recoveryLogs, tabletFiles, mutationReceiver);
   }
 
   public int createLogId() {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index cdb787f..a0bebab 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -31,6 +31,7 @@ import java.lang.reflect.Method;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -597,7 +598,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
     return new LoggerOperation(work);
   }
 
-  public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException
{
+  public LoggerOperation logManyTablets(Collection<TabletMutations> mutations) throws
IOException {
     Durability durability = Durability.NONE;
     List<Pair<LogFileKey,LogFileValue>> data = new ArrayList<>();
     for (TabletMutations tabletMutations : mutations) {
@@ -608,21 +609,20 @@ public class DfsLogger implements Comparable<DfsLogger> {
       LogFileValue value = new LogFileValue();
       value.mutations = tabletMutations.getMutations();
       data.add(new Pair<>(key, value));
-      if (tabletMutations.getDurability().ordinal() > durability.ordinal()) {
-        durability = tabletMutations.getDurability();
-      }
+      durability = maxDurability(tabletMutations.getDurability(), durability);
     }
-    return logFileData(data, chooseDurabilityForGroupCommit(mutations));
+    return logFileData(data, durability);
   }
 
-  static Durability chooseDurabilityForGroupCommit(List<TabletMutations> mutations)
{
-    Durability result = Durability.NONE;
-    for (TabletMutations tabletMutations : mutations) {
-      if (tabletMutations.getDurability().ordinal() > result.ordinal()) {
-        result = tabletMutations.getDurability();
-      }
+  /**
+   * Return the Durability with the highest precedence
+   */
+  static Durability maxDurability(Durability dur1, Durability dur2) {
+    if (dur1.ordinal() > dur2.ordinal()) {
+      return dur1;
+    } else {
+      return dur2;
     }
-    return result;
   }
 
   public LoggerOperation minorCompactionFinished(long seq, int tid, String fqfn,
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 4f70880..1a1bfa2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -16,15 +16,13 @@
  */
 package org.apache.accumulo.tserver.log;
 
+import static java.util.Collections.singletonList;
+
 import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -44,13 +42,11 @@ import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.fate.util.LoggingRunnable;
 import org.apache.accumulo.fate.util.Retry;
 import org.apache.accumulo.fate.util.Retry.RetryFactory;
-import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.util.Halt;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
-import org.apache.accumulo.tserver.Mutations;
 import org.apache.accumulo.tserver.TabletMutations;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;
@@ -320,15 +316,6 @@ public class TabletServerLogger {
     }));
   }
 
-  public void resetLoggers() throws IOException {
-    logIdLock.writeLock().lock();
-    try {
-      close();
-    } finally {
-      logIdLock.writeLock().unlock();
-    }
-  }
-
   private synchronized void close() throws IOException {
     if (!logIdLock.isWriteLockedByCurrentThread()) {
       throw new IllegalStateException("close should be called with write lock held!");
@@ -356,22 +343,6 @@ public class TabletServerLogger {
     LoggerOperation write(DfsLogger logger) throws Exception;
   }
 
-  private void write(CommitSession commitSession, boolean mincFinish, Writer writer)
-      throws IOException {
-    write(commitSession, mincFinish, writer, writeRetryFactory.createRetry());
-  }
-
-  private void write(CommitSession commitSession, boolean mincFinish, Writer writer,
-      Retry writeRetry) throws IOException {
-    List<CommitSession> sessions = Collections.singletonList(commitSession);
-    write(sessions, mincFinish, writer, writeRetry);
-  }
-
-  private void write(final Collection<CommitSession> sessions, boolean mincFinish,
Writer writer)
-      throws IOException {
-    write(sessions, mincFinish, writer, writeRetryFactory.createRetry());
-  }
-
   private void write(final Collection<CommitSession> sessions, boolean mincFinish,
Writer writer,
       Retry writeRetry) throws IOException {
     // Work very hard not to lock this during calls to the outside world
@@ -454,7 +425,6 @@ public class TabletServerLogger {
           @Override
           void withWriteLock() throws IOException {
             close();
-            closeForReplication(sessions);
           }
         });
       }
@@ -471,70 +441,44 @@ public class TabletServerLogger {
       @Override
       void withWriteLock() throws IOException {
         close();
-        closeForReplication(sessions);
       }
     });
   }
 
-  protected void closeForReplication(Collection<CommitSession> sessions) {
-    // TODO We can close the WAL here for replication purposes
-  }
-
   public void defineTablet(final CommitSession commitSession, final Retry writeRetry)
       throws IOException {
     // scribble this into the metadata tablet, too.
-    write(commitSession, false, new Writer() {
-      @Override
-      public LoggerOperation write(DfsLogger logger) throws Exception {
-        logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(),
-            commitSession.getExtent());
-        return DfsLogger.NO_WAIT_LOGGER_OP;
-      }
+    write(singletonList(commitSession), false, logger -> {
+      logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(),
+          commitSession.getExtent());
+      return DfsLogger.NO_WAIT_LOGGER_OP;
     }, writeRetry);
   }
 
+  /**
+   * Log a single mutation. This method expects mutations that have a durability other than
NONE.
+   */
   public void log(final CommitSession commitSession, final long tabletSeq, final Mutation
m,
       final Durability durability) throws IOException {
-    if (durability == Durability.NONE) {
-      return;
-    }
-    if (durability == Durability.DEFAULT) {
+    if (durability == Durability.DEFAULT || durability == Durability.NONE) {
       throw new IllegalArgumentException("Unexpected durability " + durability);
     }
-    write(commitSession, false, new Writer() {
-      @Override
-      public LoggerOperation write(DfsLogger logger) throws Exception {
-        return logger.log(tabletSeq, commitSession.getLogId(), m, durability);
-      }
-    });
+    write(singletonList(commitSession), false,
+        logger -> logger.log(tabletSeq, commitSession.getLogId(), m, durability),
+        writeRetryFactory.createRetry());
     logSizeEstimate.addAndGet(m.numBytes());
   }
 
-  public void logManyTablets(Map<CommitSession,Mutations> mutations) throws IOException
{
-
-    final Map<CommitSession,Mutations> loggables = new HashMap<>(mutations);
-    for (Entry<CommitSession,Mutations> entry : mutations.entrySet()) {
-      if (entry.getValue().getDurability() == Durability.NONE) {
-        loggables.remove(entry.getKey());
-      }
-    }
+  /**
+   * Log mutations. This method expects mutations that have a durability other than NONE.
+   */
+  public void logManyTablets(Map<CommitSession,TabletMutations> loggables) throws IOException
{
     if (loggables.size() == 0)
       return;
 
-    write(loggables.keySet(), false, new Writer() {
-      @Override
-      public LoggerOperation write(DfsLogger logger) throws Exception {
-        List<TabletMutations> copy = new ArrayList<>(loggables.size());
-        for (Entry<CommitSession,Mutations> entry : loggables.entrySet()) {
-          CommitSession cs = entry.getKey();
-          Durability durability = entry.getValue().getDurability();
-          copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(),
-              entry.getValue().getMutations(), durability));
-        }
-        return logger.logManyTablets(copy);
-      }
-    });
-    for (Mutations entry : loggables.values()) {
+    write(loggables.keySet(), false, logger -> logger.logManyTablets(loggables.values()),
+        writeRetryFactory.createRetry());
+    for (TabletMutations entry : loggables.values()) {
       if (entry.getMutations().size() < 1) {
         throw new IllegalArgumentException("logManyTablets: logging empty mutation list");
       }
@@ -550,13 +494,10 @@ public class TabletServerLogger {
 
     long t1 = System.currentTimeMillis();
 
-    write(commitSession, true, new Writer() {
-      @Override
-      public LoggerOperation write(DfsLogger logger) throws Exception {
-        return logger.minorCompactionFinished(walogSeq, commitSession.getLogId(),
-            fullyQualifiedFileName, durability);
-      }
-    });
+    write(
+        singletonList(commitSession), true, logger -> logger.minorCompactionFinished(walogSeq,
+            commitSession.getLogId(), fullyQualifiedFileName, durability),
+        writeRetryFactory.createRetry());
 
     long t2 = System.currentTimeMillis();
 
@@ -565,18 +506,15 @@ public class TabletServerLogger {
 
   public long minorCompactionStarted(final CommitSession commitSession, final long seq,
       final String fullyQualifiedFileName, final Durability durability) throws IOException
{
-    write(commitSession, false, new Writer() {
-      @Override
-      public LoggerOperation write(DfsLogger logger) throws Exception {
-        return logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName,
-            durability);
-      }
-    });
+    write(
+        singletonList(commitSession), false, logger -> logger.minorCompactionStarted(seq,
+            commitSession.getLogId(), fullyQualifiedFileName, durability),
+        writeRetryFactory.createRetry());
     return seq;
   }
 
-  public void recover(VolumeManager fs, KeyExtent extent, TableConfiguration tconf, List<Path>
logs,
-      Set<String> tabletFiles, MutationReceiver mr) throws IOException {
+  public void recover(VolumeManager fs, KeyExtent extent, List<Path> logs, Set<String>
tabletFiles,
+      MutationReceiver mr) throws IOException {
     try {
       SortedLogRecovery recovery = new SortedLogRecovery(fs);
       recovery.recover(extent, logs, tabletFiles, mr);
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
index 0291747..80a6ff5 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.tserver.log;
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
@@ -31,28 +32,36 @@ public class DfsLoggerTest {
   @Test
   public void testDurabilityForGroupCommit() {
     List<TabletMutations> lst = new ArrayList<>();
-    assertEquals(Durability.NONE, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    assertEquals(Durability.NONE, chooseDurabilityForGroupCommit(lst));
     TabletMutations m1 = new TabletMutations(0, 1, Collections.emptyList(), Durability.NONE);
     lst.add(m1);
-    assertEquals(Durability.NONE, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    assertEquals(Durability.NONE, chooseDurabilityForGroupCommit(lst));
     TabletMutations m2 = new TabletMutations(0, 1, Collections.emptyList(), Durability.LOG);
     lst.add(m2);
-    assertEquals(Durability.LOG, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    assertEquals(Durability.LOG, chooseDurabilityForGroupCommit(lst));
     TabletMutations m3 = new TabletMutations(0, 1, Collections.emptyList(), Durability.NONE);
     lst.add(m3);
-    assertEquals(Durability.LOG, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    assertEquals(Durability.LOG, chooseDurabilityForGroupCommit(lst));
     TabletMutations m4 = new TabletMutations(0, 1, Collections.emptyList(), Durability.FLUSH);
     lst.add(m4);
-    assertEquals(Durability.FLUSH, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    assertEquals(Durability.FLUSH, chooseDurabilityForGroupCommit(lst));
     TabletMutations m5 = new TabletMutations(0, 1, Collections.emptyList(), Durability.LOG);
     lst.add(m5);
-    assertEquals(Durability.FLUSH, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    assertEquals(Durability.FLUSH, chooseDurabilityForGroupCommit(lst));
     TabletMutations m6 = new TabletMutations(0, 1, Collections.emptyList(), Durability.SYNC);
     lst.add(m6);
-    assertEquals(Durability.SYNC, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    assertEquals(Durability.SYNC, chooseDurabilityForGroupCommit(lst));
     TabletMutations m7 = new TabletMutations(0, 1, Collections.emptyList(), Durability.FLUSH);
     lst.add(m7);
-    assertEquals(Durability.SYNC, DfsLogger.chooseDurabilityForGroupCommit(lst));
+    assertEquals(Durability.SYNC, chooseDurabilityForGroupCommit(lst));
+  }
+
+  static Durability chooseDurabilityForGroupCommit(Collection<TabletMutations> mutations)
{
+    Durability result = Durability.NONE;
+    for (TabletMutations tabletMutations : mutations) {
+      result = DfsLogger.maxDurability(tabletMutations.getDurability(), result);
+    }
+    return result;
   }
 
 }


Mime
View raw message