accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmil...@apache.org
Subject [accumulo] branch master updated: Improve internal MapCounter class (#997)
Date Thu, 28 Feb 2019 20:13:33 GMT
This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new bee584b  Improve internal MapCounter class (#997)
bee584b is described below

commit bee584bcb689a6b9394a3cea30486c6c3f43c49e
Author: Mike Miller <mmiller@apache.org>
AuthorDate: Thu Feb 28 15:13:28 2019 -0500

    Improve internal MapCounter class (#997)
    
    * Create getInt() and max() methods, replacing code where appropriate
    * Change increment() to use computeIfAbsent
    * Change values() to use LongStream
---
 .../core/clientImpl/TableOperationsImpl.java       |  2 +-
 .../org/apache/accumulo/core/util/MapCounter.java  | 28 +++++++++++-----------
 .../server/master/balancer/GroupBalancer.java      |  4 ++--
 .../master/tableOps/bulkVer2/LoadFiles.java        |  5 ++--
 .../master/tableOps/compact/CompactionDriver.java  |  4 +---
 .../org/apache/accumulo/tserver/TabletServer.java  | 21 ++++++++--------
 6 files changed, 30 insertions(+), 34 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index b607dbb..9068fd6 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -1288,7 +1288,7 @@ public class TableOperationsImpl extends TableOperationsHelper {
         long waitTime;
         long maxPerServer = 0;
         if (serverCounts.size() > 0) {
-          maxPerServer = Collections.max(serverCounts.values());
+          maxPerServer = serverCounts.max();
           waitTime = maxPerServer * 10;
         } else
           waitTime = waitFor * 10;
diff --git a/core/src/main/java/org/apache/accumulo/core/util/MapCounter.java b/core/src/main/java/org/apache/accumulo/core/util/MapCounter.java
index a2d4656..48478eb 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/MapCounter.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/MapCounter.java
@@ -16,11 +16,13 @@
  */
 package org.apache.accumulo.core.util;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Set;
+import java.util.stream.LongStream;
 
+/**
+ * A Map counter for counting with longs or integers. Not thread safe.
+ */
 public class MapCounter<KT> {
 
   static class MutableLong {
@@ -34,11 +36,7 @@ public class MapCounter<KT> {
   }
 
   public long increment(KT key, long l) {
-    MutableLong ml = map.get(key);
-    if (ml == null) {
-      ml = new MutableLong();
-      map.put(key, ml);
-    }
+    MutableLong ml = map.computeIfAbsent(key, KT -> new MutableLong());
 
     ml.l += l;
 
@@ -62,18 +60,20 @@ public class MapCounter<KT> {
     return ml.l;
   }
 
+  public int getInt(KT key) {
+    return Math.toIntExact(get(key));
+  }
+
   public Set<KT> keySet() {
     return map.keySet();
   }
 
-  public Collection<Long> values() {
-    Collection<MutableLong> vals = map.values();
-    ArrayList<Long> ret = new ArrayList<>(vals.size());
-    for (MutableLong ml : vals) {
-      ret.add(ml.l);
-    }
+  public LongStream values() {
+    return map.values().stream().mapToLong(mutLong -> mutLong.l);
+  }
 
-    return ret;
+  public long max() {
+    return values().max().orElse(0);
   }
 
   public int size() {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java
b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java
index f0d1d85..ad80156 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java
@@ -254,9 +254,9 @@ public abstract class GroupBalancer extends TabletBalancer {
 
     int totalExtra = 0;
     for (String group : groupCounts.keySet()) {
-      long groupCount = groupCounts.get(group);
+      int groupCount = groupCounts.getInt(group);
       totalExtra += groupCount % current.size();
-      expectedCounts.put(group, (int) (groupCount / current.size()));
+      expectedCounts.put(group, (groupCount / current.size()));
     }
 
     // The number of extra tablets from all groups that each tserver must have.
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
index fc28a94..c69fb3e 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java
@@ -19,7 +19,6 @@ package org.apache.accumulo.master.tableOps.bulkVer2;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -234,7 +233,7 @@ class LoadFiles extends MasterRepo {
       if (loadMsgs.size() > 0) {
         // find which tablet server had the most load messages sent to it and sleep 13ms
for each
         // load message
-        sleepTime = Collections.max(loadMsgs.values()) * 13;
+        sleepTime = loadMsgs.max() * 13;
       }
 
       if (locationLess > 0) {
@@ -292,7 +291,7 @@ class LoadFiles extends MasterRepo {
       long sleepTime = 0;
       if (unloadingTablets.size() > 0) {
         // find which tablet server had the most tablets to unload and sleep 13ms for each
tablet
-        sleepTime = Collections.max(unloadingTablets.values()) * 13;
+        sleepTime = unloadingTablets.max() * 13;
       }
 
       return sleepTime;
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/compact/CompactionDriver.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/compact/CompactionDriver.java
index 2a7528d..1571e2e 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/compact/CompactionDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/compact/CompactionDriver.java
@@ -16,8 +16,6 @@
  */
 package org.apache.accumulo.master.tableOps.compact;
 
-import java.util.Collections;
-
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
 import org.apache.accumulo.core.clientImpl.Tables;
@@ -128,7 +126,7 @@ class CompactionDriver extends MasterRepo {
 
     // make wait time depend on the server with the most to compact
     if (serversToFlush.size() > 0)
-      sleepTime = Collections.max(serversToFlush.values()) * sleepTime;
+      sleepTime = serversToFlush.max() * sleepTime;
 
     sleepTime = Math.max(2 * scanTime, sleepTime);
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 5ebe172..f4508ff 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3163,10 +3163,10 @@ public class TabletServer implements Runnable {
     synchronized (this.onlineTablets) {
       onlineTabletsCopy = new HashMap<>(this.onlineTablets);
     }
-    Map<String,TableInfo> tables = new HashMap<>();
+    final Map<String,TableInfo> tables = new HashMap<>();
 
-    for (Entry<KeyExtent,Tablet> entry : onlineTabletsCopy.entrySet()) {
-      String tableId = entry.getKey().getTableId().canonical();
+    onlineTabletsCopy.forEach((ke, tablet) -> {
+      String tableId = ke.getTableId().canonical();
       TableInfo table = tables.get(tableId);
       if (table == null) {
         table = new TableInfo();
@@ -3174,7 +3174,6 @@ public class TabletServer implements Runnable {
         table.majors = new Compacting();
         tables.put(tableId, table);
       }
-      Tablet tablet = entry.getValue();
       long recs = tablet.getNumEntries();
       table.tablets++;
       table.onlineTablets++;
@@ -3194,21 +3193,21 @@ public class TabletServer implements Runnable {
         table.majors.running++;
       if (tablet.isMajorCompactionQueued())
         table.majors.queued++;
-    }
+    });
 
-    for (Entry<TableId,MapCounter<ScanRunState>> entry : scanCounts.entrySet())
{
-      TableInfo table = tables.get(entry.getKey().canonical());
+    scanCounts.forEach((tableId, mapCounter) -> {
+      TableInfo table = tables.get(tableId.canonical());
       if (table == null) {
         table = new TableInfo();
-        tables.put(entry.getKey().canonical(), table);
+        tables.put(tableId.canonical(), table);
       }
 
       if (table.scans == null)
         table.scans = new Compacting();
 
-      table.scans.queued += entry.getValue().get(ScanRunState.QUEUED);
-      table.scans.running += entry.getValue().get(ScanRunState.RUNNING);
-    }
+      table.scans.queued += mapCounter.getInt(ScanRunState.QUEUED);
+      table.scans.running += mapCounter.getInt(ScanRunState.RUNNING);
+    });
 
     ArrayList<KeyExtent> offlineTabletsCopy = new ArrayList<>();
     synchronized (this.unopenedTablets) {


Mime
View raw message