lucene-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [lucene-solr] 01/36: Initial patch.
Date Wed, 18 Dec 2019 16:38:39 GMT
This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-13579
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit d589383e5e3a95ef1cd3b78a51277f38ad18afd1
Author: Andrzej Bialecki <ab@apache.org>
AuthorDate: Tue Jun 18 22:14:29 2019 +0200

    Initial patch.
---
 .../solr/managed/AbstractResourceManager.java      | 263 +++++++++++++++++++++
 .../src/java/org/apache/solr/managed/Limit.java    |  35 +++
 .../src/java/org/apache/solr/managed/Limits.java   |  75 ++++++
 .../solr/managed/ProportionalResourceManager.java  |  48 ++++
 .../org/apache/solr/managed/ResourceManaged.java   |  23 ++
 .../org/apache/solr/managed/ResourceManager.java   |  39 +++
 6 files changed, 483 insertions(+)

diff --git a/solr/core/src/java/org/apache/solr/managed/AbstractResourceManager.java b/solr/core/src/java/org/apache/solr/managed/AbstractResourceManager.java
new file mode 100644
index 0000000..41cb185
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/managed/AbstractResourceManager.java
@@ -0,0 +1,263 @@
+package org.apache.solr.managed;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.core.PluginInfo;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.SolrPluginUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public abstract class AbstractResourceManager implements ResourceManager {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public static final String SCHEDULE_DELAY_SECONDS_PARAM = "scheduleDelaySeconds";
+  public static final String MAX_NUM_POOLS_PARAM = "maxNumPools";
+
+  public static final int DEFAULT_MAX_POOLS = 20;
+
+  public static class Pool implements Runnable, Closeable {
+    private final AbstractResourceManager resourceManager;
+    private final Map<String, ResourceManaged> resources = new ConcurrentHashMap<>();
+    private Limits limits;
+    private final Map<String, Object> params;
+    private final Map<String, Float> totalCosts = new ConcurrentHashMap<>();
+    private Map<String, Map<String, Float>> currentValues = null;
+    private Map<String, Float> totalValues = null;
+    int scheduleDelaySeconds;
+    ScheduledFuture<?> scheduledFuture;
+
+    public Pool(AbstractResourceManager resourceManager, Limits limits, Map<String, Object>
params) {
+      this.resourceManager = resourceManager;
+      this.limits = limits.copy();
+      this.params = new HashMap<>(params);
+    }
+
+    public synchronized void addResource(ResourceManaged resourceManaged) {
+      if (resources.containsKey(resourceManaged.getName())) {
+        throw new IllegalArgumentException("Pool already has resource '" + resourceManaged.getName()
+ "'.");
+      }
+      resources.put(resourceManaged.getName(), resourceManaged);
+      Limits managedLimits = resourceManaged.getManagedLimits();
+      managedLimits.forEach(entry -> {
+        Float total = totalCosts.get(entry.getKey());
+        if (total == null) {
+          totalCosts.put(entry.getKey(), entry.getValue().cost);
+        } else {
+          totalCosts.put(entry.getKey(), entry.getValue().cost + total);
+        }
+      });
+    }
+
+    public Map<String, ResourceManaged> getResources() {
+      return Collections.unmodifiableMap(resources);
+    }
+
+    public Map<String, Map<String, Float>> getCurrentValues() {
+      // collect current values
+      currentValues = new HashMap<>();
+      for (ResourceManaged resource : resources.values()) {
+        currentValues.put(resource.getName(), resource.getManagedValues());
+      }
+      // calculate totals
+      totalValues = new HashMap<>();
+      currentValues.values().forEach(map -> map.forEach((k, v) -> {
+        Float total = totalValues.get(k);
+        if (total == null) {
+          totalValues.put(k, v);
+        } else {
+          totalValues.put(k, total + v);
+        }
+      }));
+      return Collections.unmodifiableMap(currentValues);
+    }
+
+    /**
+     * This returns cumulative values of all resources. NOTE:
+     * you must call {@link #getCurrentValues()} first!
+     * @return
+     */
+    public Map<String, Float> getTotalValues() {
+      return Collections.unmodifiableMap(totalValues);
+    }
+
+    public Map<String, Float> getTotalCosts() {
+      return Collections.unmodifiableMap(totalCosts);
+    }
+
+    public Limits getLimits() {
+      return limits;
+    }
+
+    public void setLimits(Limits limits) {
+      this.limits = limits.copy();
+    }
+
+    @Override
+    public void run() {
+      resourceManager.managePool(this);
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (scheduledFuture != null) {
+        scheduledFuture.cancel(true);
+        scheduledFuture = null;
+      }
+    }
+  }
+
+
+  private Map<String, Pool> resourcePools = new ConcurrentHashMap<>();
+  private PluginInfo pluginInfo;
+  private int maxNumPools = DEFAULT_MAX_POOLS;
+  private TimeSource timeSource;
+
+  /**
+   * Thread pool for scheduling the pool runs.
+   */
+  private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
+
+  protected boolean isClosed = false;
+  protected boolean enabled = true;
+
+  public AbstractResourceManager(TimeSource timeSource) {
+    this.timeSource = timeSource;
+  }
+
+  @Override
+  public void init(PluginInfo info) {
+    if (info != null) {
+      this.pluginInfo = info.copy();
+      if (pluginInfo.initArgs != null) {
+        SolrPluginUtils.invokeSetters(this, this.pluginInfo.initArgs);
+      }
+    }
+    if (!enabled) {
+      log.debug("Resource manager " + getClass().getSimpleName() + " disabled.");
+      return;
+    }
+    scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(maxNumPools,
+        new DefaultSolrThreadFactory(getClass().getSimpleName()));
+    scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
+    scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+  }
+
+  public void setMaxNumPools(Integer maxNumPools) {
+    if (maxNumPools != null) {
+      this.maxNumPools = maxNumPools;
+    } else {
+      this.maxNumPools = DEFAULT_MAX_POOLS;
+    }
+  }
+
+  @Override
+  public void setEnabled(Boolean enabled) {
+    if (enabled != null) {
+      this.enabled = enabled;
+    }
+  }
+
+  @Override
+  public PluginInfo getPluginInfo() {
+    return pluginInfo;
+  }
+
+  protected abstract void managePool(Pool pool);
+
+  @Override
+  public void createPool(String name, Limits limits, Map<String, Object> params) throws
Exception {
+    ensureNotClosed();
+    if (resourcePools.containsKey(name)) {
+      throw new IllegalArgumentException("Pool '" + name + "' already exists.");
+    }
+    if (resourcePools.size() >= maxNumPools) {
+      throw new IllegalArgumentException("Maximum number of pools (" + maxNumPools + ") reached.");
+    }
+    Pool newPool = new Pool(this, limits, params);
+    newPool.scheduleDelaySeconds = Integer.parseInt(String.valueOf(params.getOrDefault(SCHEDULE_DELAY_SECONDS_PARAM,
10)));
+    resourcePools.putIfAbsent(name, newPool);
+    newPool.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(newPool,
0,
+        timeSource.convertDelay(TimeUnit.SECONDS, newPool.scheduleDelaySeconds, TimeUnit.MILLISECONDS),
+        TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public void modifyPoolLimits(String name, Limits limits) throws Exception {
+    ensureNotClosed();
+    Pool pool = resourcePools.get(name);
+    if (pool == null) {
+      throw new IllegalArgumentException("Pool '" + name + "' doesn't exist.");
+    }
+    pool.setLimits(limits);
+  }
+
+  @Override
+  public void removePool(String name) throws Exception {
+    ensureNotClosed();
+    Pool pool = resourcePools.remove(name);
+    if (pool == null) {
+      throw new IllegalArgumentException("Pool '" + name + "' doesn't exist.");
+    }
+    if (pool.scheduledFuture != null) {
+      pool.scheduledFuture.cancel(true);
+    }
+  }
+
+  @Override
+  public void addResources(String name, Collection<ResourceManaged> resourceManaged)
{
+    ensureNotClosed();
+    for (ResourceManaged resource : resourceManaged) {
+      addResource(name, resource);
+    }
+  }
+
+  @Override
+  public void addResource(String name, ResourceManaged resourceManaged) {
+    ensureNotClosed();
+    Pool pool = resourcePools.get(name);
+    if (pool == null) {
+      throw new IllegalArgumentException("Pool '" + name + "' doesn't exist.");
+    }
+    pool.addResource(resourceManaged);
+  }
+
+  @Override
+  public void close() throws IOException {
+    synchronized (this) {
+      isClosed = true;
+      log.debug("Closing all pools.");
+      for (Pool pool : resourcePools.values()) {
+        IOUtils.closeQuietly(pool);
+      }
+      resourcePools.clear();
+    }
+    log.debug("Shutting down scheduled thread pool executor now");
+    scheduledThreadPoolExecutor.shutdownNow();
+    log.debug("Awaiting termination of scheduled thread pool executor");
+    ExecutorUtil.awaitTermination(scheduledThreadPoolExecutor);
+    log.debug("Closed.");
+  }
+
+  @Override
+  public boolean isClosed() {
+    return isClosed;
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/managed/Limit.java b/solr/core/src/java/org/apache/solr/managed/Limit.java
new file mode 100644
index 0000000..0eee67d
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/managed/Limit.java
@@ -0,0 +1,35 @@
+package org.apache.solr.managed;
+
+/**
+ *
+ */
+public class Limit {
+  public final float min, max, cost;
+
+  public Limit() {
+    this(Float.MIN_VALUE, Float.MAX_VALUE, 1.0f);
+  }
+
+  public Limit(float min, float max) {
+    this(min, max, 1.0f);
+  }
+
+  public Limit(float min, float max, float cost) {
+    if (cost <= 0.0f) {
+      throw new IllegalArgumentException("cost must be > 0.0f");
+    }
+    this.min = min;
+    this.max = max;
+    this.cost = cost;
+  }
+
+  public float deltaOutsideLimit(float currentValue) {
+    if (currentValue < min) {
+      return currentValue - min;
+    } else if (currentValue > max) {
+      return currentValue - max;
+    } else {
+      return 0;
+    }
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/managed/Limits.java b/solr/core/src/java/org/apache/solr/managed/Limits.java
new file mode 100644
index 0000000..d817be9
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/managed/Limits.java
@@ -0,0 +1,75 @@
+package org.apache.solr.managed;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ *
+ */
+public class Limits implements Iterable<Map.Entry<String, Limit>> {
+
+  public static final Limit UNLIMITED = new Limit(Float.MIN_VALUE, Float.MAX_VALUE);
+
+  private Map<String, Limit> values = new HashMap<>();
+
+  public void setLimit(String key, Limit value) {
+    if (value != null) {
+      values.put(key, value);
+    } else {
+      values.remove(key);
+    }
+  }
+
+  public void setLimitMax(String key, float max) {
+    Limit limit = values.computeIfAbsent(key, k -> new Limit(Float.MIN_VALUE, max));
+    if (limit.max == max) {
+      return;
+    } else {
+      values.put(key, new Limit(limit.min, max));
+    }
+  }
+
+  public void setLimitMin(String key, float min) {
+    Limit limit = values.computeIfAbsent(key, k -> new Limit(min, Float.MAX_VALUE));
+    if (limit.min == min) {
+      return;
+    } else {
+      values.put(key, new Limit(min, limit.max));
+    }
+  }
+
+  public Limit getLimit(String key) {
+    return getLimit(key, UNLIMITED);
+  }
+
+  public Limit getLimit(String key, Limit defValue) {
+    Limit value = values.get(key);
+    if (value != null) {
+      return value;
+    } else {
+      return defValue;
+    }
+  }
+
+  public Set<String> getKeys() {
+    return Collections.unmodifiableSet(values.keySet());
+  }
+
+  public void removeLimit(String key) {
+    values.remove(key);
+  }
+
+  public Limits copy() {
+    Limits cloned = new Limits();
+    cloned.values.putAll(values);
+    return cloned;
+  }
+
+  @Override
+  public Iterator<Map.Entry<String, Limit>> iterator() {
+    return Collections.unmodifiableMap(values).entrySet().iterator();
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/managed/ProportionalResourceManager.java b/solr/core/src/java/org/apache/solr/managed/ProportionalResourceManager.java
new file mode 100644
index 0000000..f2edf65
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/managed/ProportionalResourceManager.java
@@ -0,0 +1,48 @@
+package org.apache.solr.managed;
+
+import java.util.Map;
+
+import org.apache.solr.common.util.TimeSource;
+
+/**
+ *
+ */
+public class ProportionalResourceManager extends AbstractResourceManager {
+  public ProportionalResourceManager(TimeSource timeSource) {
+    super(timeSource);
+  }
+
+  @Override
+  protected void managePool(Pool pool) {
+    Map<String, Map<String, Float>> currentValues = pool.getCurrentValues();
+    Map<String, Float> totalValues = pool.getTotalValues();
+    Map<String, Float> totalCosts = pool.getTotalCosts();
+    pool.getLimits().forEach(entry -> {
+      Limit poolLimit = entry.getValue();
+      Float totalValue = totalValues.get(entry.getKey());
+      if (totalValue == null) {
+        return;
+      }
+      float delta = poolLimit.deltaOutsideLimit(totalValue);
+      Float totalCost = totalCosts.get(entry.getKey());
+      if (totalCost == null || totalCost == 0) {
+        return;
+      }
+      // re-adjust the limits based on relative costs
+      pool.getResources().forEach((name, resource) -> {
+        Map<String, Float> current = currentValues.get(name);
+        if (current == null) {
+          return;
+        }
+        Limits limits = resource.getManagedLimits();
+        Limit limit = limits.getLimit(entry.getKey());
+        if (limit == null) {
+          return;
+        }
+        float newMax = limit.max - delta * limit.cost / totalCost;
+        resource.setManagedLimitMax(entry.getKey(), newMax);
+      });
+    });
+  }
+
+}
diff --git a/solr/core/src/java/org/apache/solr/managed/ResourceManaged.java b/solr/core/src/java/org/apache/solr/managed/ResourceManaged.java
new file mode 100644
index 0000000..889bc85
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/managed/ResourceManaged.java
@@ -0,0 +1,23 @@
+package org.apache.solr.managed;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public interface ResourceManaged {
+
+  String getName();
+
+  void setManagedLimits(Limits limits);
+
+  void setManagedLimit(String key, Limit limit);
+
+  void setManagedLimitMax(String key, float max);
+
+  void setManagedLimitMin(String key, float min);
+
+  Limits getManagedLimits();
+
+  Map<String, Float> getManagedValues();
+}
diff --git a/solr/core/src/java/org/apache/solr/managed/ResourceManager.java b/solr/core/src/java/org/apache/solr/managed/ResourceManager.java
new file mode 100644
index 0000000..cd9e87d
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/managed/ResourceManager.java
@@ -0,0 +1,39 @@
+package org.apache.solr.managed;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.solr.common.SolrCloseable;
+import org.apache.solr.core.PluginInfo;
+import org.apache.solr.util.plugin.PluginInfoInitialized;
+
+/**
+ *
+ */
+public interface ResourceManager extends SolrCloseable, PluginInfoInitialized {
+
+  void setEnabled(Boolean enabled);
+
+  PluginInfo getPluginInfo();
+
+  void createPool(String name, Limits limits, Map<String, Object> params) throws Exception;
+
+  void modifyPoolLimits(String name, Limits limits) throws Exception;
+
+  void removePool(String name) throws Exception;
+
+  default void addResources(String pool, Collection<ResourceManaged> resourceManaged)
{
+    ensureNotClosed();
+    for (ResourceManaged resource : resourceManaged) {
+      addResource(pool, resource);
+    }
+  }
+
+  void addResource(String pool, ResourceManaged resourceManaged);
+
+  default void ensureNotClosed() {
+    if (isClosed()) {
+      throw new IllegalStateException("Already closed.");
+    }
+  }
+}


Mime
View raw message