giraph-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (GIRAPH-1125) Add memory estimation mechanism to out-of-core
Date Mon, 19 Dec 2016 23:32:59 GMT

    [ https://issues.apache.org/jira/browse/GIRAPH-1125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762637#comment-15762637
] 

ASF GitHub Bot commented on GIRAPH-1125:
----------------------------------------

Github user heslami commented on a diff in the pull request:

    https://github.com/apache/giraph/pull/12#discussion_r93143346
  
    --- Diff: giraph-core/src/main/java/org/apache/giraph/ooc/policy/MemoryEstimatorOracle.java
---
    @@ -0,0 +1,851 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.giraph.ooc.policy;
    +
    +import com.sun.management.GarbageCollectionNotificationInfo;
    +import org.apache.commons.math.stat.regression.OLSMultipleLinearRegression;
    +import org.apache.giraph.comm.NetworkMetrics;
    +import org.apache.giraph.conf.FloatConfOption;
    +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
    +import org.apache.giraph.conf.LongConfOption;
    +import org.apache.giraph.edge.AbstractEdgeStore;
    +import org.apache.giraph.ooc.OutOfCoreEngine;
    +import org.apache.giraph.ooc.command.IOCommand;
    +import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
    +import org.apache.giraph.ooc.command.WaitIOCommand;
    +import org.apache.giraph.worker.EdgeInputSplitsCallable;
    +import org.apache.giraph.worker.VertexInputSplitsCallable;
    +import org.apache.giraph.worker.WorkerProgress;
    +import org.apache.log4j.Logger;
    +
    +import java.lang.management.ManagementFactory;
    +import java.lang.management.MemoryPoolMXBean;
    +import java.lang.management.MemoryUsage;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Vector;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.locks.Lock;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +import static com.google.common.base.Preconditions.checkState;
    +
    +/**
    + * Implementation of {@link OutOfCoreOracle} that uses a linear regression model
    + * to estimate actual memory usage based on the current state of computation.
    + * The model takes into consideration 5 parameters:
    + *
    + * y = c0 + c1*x1 + c2*x2 + c3*x3 + c4*x4 + c5*x5
    + *
    + * y: memory usage
    + * x1: edges loaded
    + * x2: vertices loaded
    + * x3: vertices processed
    + * x4: bytes received due to messages
    + * x5: bytes loaded/stored from/to disk due to OOC.
    + *
    + */
    +public class MemoryEstimatorOracle implements OutOfCoreOracle {
    +  /** Memory check interval in msec */
    +  public static final LongConfOption CHECK_MEMORY_INTERVAL =
    +    new LongConfOption("giraph.garbageEstimator.checkMemoryInterval", 1000,
    +        "The interval where memory checker thread wakes up and " +
    +            "monitors memory footprint (in milliseconds)");
    +  /**
    +   * If mem-usage is above this threshold and no Full GC has been called,
    +   * we call it manually
    +   */
    +  public static final FloatConfOption MANUAL_GC_MEMORY_PRESSURE =
    +    new FloatConfOption("giraph.garbageEstimator.manualGCPressure", 0.95f,
    +        "The threshold above which GC is called manually if Full GC has not " +
    +            "happened in a while");
    +  /** Used to detect a high memory pressure situation */
    +  public static final FloatConfOption GC_MINIMUM_RECLAIM_FRACTION =
    +    new FloatConfOption("giraph.garbageEstimator.gcReclaimFraction", 0.05f,
    +        "Minimum percentage of memory we expect to be reclaimed after a Full " +
    +            "GC. If less than this amount is reclaimed, it is sage to say " +
    +            "we are in a high memory situation and the estimation mechanism " +
    +            "has not recognized it yet!");
    +  /** If mem-usage is above this threshold, active threads are set to 0 */
    +  public static final FloatConfOption AM_HIGH_THRESHOLD =
    +    new FloatConfOption("giraph.amHighThreshold", 0.95f,
    +        "If mem-usage is above this threshold, all active threads " +
    +            "(compute/input) are paused.");
    +  /** If mem-usage is below this threshold, active threads are set to max */
    +  public static final FloatConfOption AM_LOW_THRESHOLD =
    +    new FloatConfOption("giraph.amLowThreshold", 0.90f,
    +        "If mem-usage is below this threshold, all active threads " +
    +            "(compute/input) are running.");
    +  /** If mem-usage is above this threshold, credit is set to 0 */
    +  public static final FloatConfOption CREDIT_HIGH_THRESHOLD =
    +    new FloatConfOption("giraph.creditHighThreshold", 0.95f,
    +        "If mem-usage is above this threshold, credit is set to 0");
    +  /** If mem-usage is below this threshold, credit is set to max */
    +  public static final FloatConfOption CREDIT_LOW_THRESHOLD =
    +    new FloatConfOption("giraph.creditLowThreshold", 0.90f,
    +        "If mem-usage is below this threshold, credit is set to max");
    +  /** OOC starts if mem-usage is above this threshold */
    +  public static final FloatConfOption OOC_THRESHOLD =
    +    new FloatConfOption("giraph.oocThreshold", 0.90f,
    +        "If mem-usage is above this threshold, out of core threads starts " +
    +            "writing data to disk");
    +
    +  /** Logger */
    +  private static final Logger LOG =
    +    Logger.getLogger(MemoryEstimatorOracle.class);
    +
    +  /** Cached value for {@link #MANUAL_GC_MEMORY_PRESSURE} */
    +  private final float manualGCMemoryPressure;
    +  /** Cached value for {@link #GC_MINIMUM_RECLAIM_FRACTION} */
    +  private final float gcReclaimFraction;
    +  /** Cached value for {@link #AM_HIGH_THRESHOLD} */
    +  private final float amHighThreshold;
    +  /** Cached value for {@link #AM_LOW_THRESHOLD} */
    +  private final float amLowThreshold;
    +  /** Cached value for {@link #CREDIT_HIGH_THRESHOLD} */
    +  private final float creditHighThreshold;
    +  /** Cached value for {@link #CREDIT_LOW_THRESHOLD} */
    +  private final float creditLowThreshold;
    +  /** Cached value for {@link #OOC_THRESHOLD} */
    +  private final float oocThreshold;
    +
    +  /** Reference to running OOC engine */
    +  private final OutOfCoreEngine oocEngine;
    +  /** Memory estimator instance */
    +  private final MemoryEstimator memoryEstimator;
    +  /** Keeps track of the number of bytes stored/loaded by OOC */
    +  private final AtomicLong oocBytesInjected = new AtomicLong(0);
    +  /** How many bytes to offload */
    +  private final AtomicLong numBytesToOffload = new AtomicLong(0);
    +  /** Current state of the OOC */
    +  private volatile State state = State.STABLE;
    +  /** Timestamp of the last major GC */
    +  private volatile long lastMajorGCTime = 0;
    +
    +  /**
    +   * Different states the OOC can be in.
    +   */
    +  private enum State {
    +    /** No offloading */
    +    STABLE,
    +    /** Current offloading */
    +    OFFLOADING,
    +  }
    +
    +  /**
    +   * Constructor.
    +   * @param conf Configuration
    +   * @param oocEngine OOC engine.:w
    +   *
    +   */
    +  public MemoryEstimatorOracle(ImmutableClassesGiraphConfiguration conf,
    +                               final OutOfCoreEngine oocEngine) {
    +    this.oocEngine = oocEngine;
    +    this.memoryEstimator = new MemoryEstimator(this.oocBytesInjected,
    +      oocEngine.getNetworkMetrics());
    +
    +    this.manualGCMemoryPressure = MANUAL_GC_MEMORY_PRESSURE.get(conf);
    +    this.gcReclaimFraction = GC_MINIMUM_RECLAIM_FRACTION.get(conf);
    +    this.amHighThreshold = AM_HIGH_THRESHOLD.get(conf);
    +    this.amLowThreshold = AM_LOW_THRESHOLD.get(conf);
    +    this.creditHighThreshold = CREDIT_HIGH_THRESHOLD.get(conf);
    +    this.creditLowThreshold = CREDIT_LOW_THRESHOLD.get(conf);
    +    this.oocThreshold = OOC_THRESHOLD.get(conf);
    +
    +    final long checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
    +
    +    Thread thread = new Thread(new Runnable() {
    +      @Override
    +      public void run() {
    +        while (true) {
    +          long oldGenUsageEstimate = memoryEstimator.getUsageEstimate();
    +          MemoryUsage usage = getOldGenUsed();
    +          if (oldGenUsageEstimate > 0) {
    +            updateRates(oldGenUsageEstimate, usage.getMax());
    +          } else {
    +            long time = System.currentTimeMillis();
    +            if (time - lastMajorGCTime >= 10000) {
    +              double used = (double) usage.getUsed() / usage.getMax();
    +              if (used > manualGCMemoryPressure) {
    +                if (LOG.isInfoEnabled()) {
    +                  LOG.info(
    +                    "High memory pressure with no full GC from the JVM. " +
    +                      "Calling GC manually. Used fraction of old-gen is " +
    +                      String.format("%.2f", used) + ".");
    +                }
    +                System.gc();
    +                time = System.currentTimeMillis() - time;
    +                usage = getOldGenUsed();
    +                used = (double) usage.getUsed() / usage.getMax();
    +                if (LOG.isInfoEnabled()) {
    +                  LOG.info("Manual GC done. It took " +
    +                    String.format("%.2f", time / 1000.0) +
    +                    " seconds. Used fraction of old-gen is " +
    +                    String.format("%.2f", used) + ".");
    +                }
    +              }
    +            }
    +          }
    +          try {
    +            Thread.sleep(checkMemoryInterval);
    +          } catch (InterruptedException e) {
    +            LOG.warn("run: exception occurred!", e);
    +            return;
    +          }
    +        }
    +      }
    +    });
    +    thread.setUncaughtExceptionHandler(oocEngine.getServiceWorker()
    +      .getGraphTaskManager().createUncaughtExceptionHandler());
    +    thread.setName("ooc-memory-checker");
    +    thread.setDaemon(true);
    +    thread.start();
    +  }
    +
    +  /**
    +   * Resets all the counters used in the memory estimation. This is called at
    +   * the beginning of a new superstep.
    +   * <p>
    +   * The number of vertices to compute in the next superstep gets reset in
    +   * {@link org.apache.giraph.graph.GraphTaskManager#processGraphPartitions}
    +   * right before
    +   * {@link org.apache.giraph.partition.PartitionStore#startIteration()} gets
    +   * called.
    +   */
    +  @Override
    +  public void startIteration() {
    +    oocBytesInjected.set(0);
    +    memoryEstimator.clear();
    +    memoryEstimator.setCurrentSuperstep(oocEngine.getSuperstep());
    +    oocEngine.updateRequestsCreditFraction(1);
    +    oocEngine.updateActiveThreadsFraction(1);
    +  }
    +
    +
    +  @Override
    +  public IOAction[] getNextIOActions() {
    +    if (state == State.OFFLOADING) {
    +      return new IOAction[]{
    +        IOAction.STORE_MESSAGES_AND_BUFFERS, IOAction.STORE_PARTITION};
    +    }
    +    long oldGenUsage = memoryEstimator.getUsageEstimate();
    +    MemoryUsage usage = getOldGenUsed();
    +    if (oldGenUsage > 0) {
    +      double usageEstimate = (double) oldGenUsage / usage.getMax();
    +      if (usageEstimate > oocThreshold) {
    +        return new IOAction[]{
    +          IOAction.STORE_MESSAGES_AND_BUFFERS,
    +          IOAction.STORE_PARTITION};
    +      } else {
    +        return new IOAction[]{IOAction.LOAD_PARTITION};
    +      }
    +    } else {
    +      return new IOAction[]{IOAction.LOAD_PARTITION};
    +    }
    +  }
    +
    +  @Override
    +  public boolean approve(IOCommand command) {
    +    return true;
    +  }
    +
    +  @Override
    +  public void commandCompleted(IOCommand command) {
    +    if (command instanceof LoadPartitionIOCommand) {
    +      oocBytesInjected.getAndAdd(command.bytesTransferred());
    +      if (state == State.OFFLOADING) {
    +        numBytesToOffload.getAndAdd(command.bytesTransferred());
    +      }
    +    } else if (!(command instanceof WaitIOCommand)) {
    +      oocBytesInjected.getAndAdd(0 - command.bytesTransferred());
    +      if (state == State.OFFLOADING) {
    +        numBytesToOffload.getAndAdd(0 - command.bytesTransferred());
    +      }
    +    }
    +
    +    if (state == State.OFFLOADING && numBytesToOffload.get() <= 0) {
    +      numBytesToOffload.set(0);
    +      state = State.STABLE;
    +      updateRates(-1, 1);
    +    }
    +  }
    +
    +  /**
    +   * When a new GC has completed, we can get an accurate measurement of the
    +   * memory usage. We use this to update the linear regression model.
    +   *
    +   * @param gcInfo GC information
    +   */
    +  @Override
    +  public synchronized void gcCompleted(
    +    GarbageCollectionNotificationInfo gcInfo) {
    +    String action = gcInfo.getGcAction().toLowerCase();
    +    String cause = gcInfo.getGcCause().toLowerCase();
    +    if (action.contains("major") &&
    +      (cause.contains("ergo") || cause.contains("system"))) {
    +      lastMajorGCTime = System.currentTimeMillis();
    +      MemoryUsage before = null;
    +      MemoryUsage after = null;
    +
    +      for (Map.Entry<String, MemoryUsage> entry :
    +        gcInfo.getGcInfo().getMemoryUsageBeforeGc().entrySet()) {
    +        String poolName = entry.getKey();
    +        if (poolName.toLowerCase().contains("old")) {
    +          before = entry.getValue();
    +          after = gcInfo.getGcInfo().getMemoryUsageAfterGc().get(poolName);
    +          break;
    +        }
    +      }
    +      if (after == null) {
    +        throw new IllegalStateException("Missing Memory Usage After GC info");
    +      }
    +      if (before == null) {
    +        throw new IllegalStateException("Missing Memory Usage Before GC info");
    +      }
    +
    +      // Compare the estimation with the actual value
    +      long usedMemoryEstimate = memoryEstimator.getUsageEstimate();
    +      long usedMemoryReal = after.getUsed();
    +      if (usedMemoryEstimate >= 0) {
    +        if (LOG.isInfoEnabled()) {
    +          LOG.info("gcCompleted: estimate=" + usedMemoryEstimate + " real=" +
    +            usedMemoryReal + " error=" +
    +            ((double) Math.abs(usedMemoryEstimate - usedMemoryReal) /
    +              usedMemoryReal * 100));
    +        }
    +      }
    +
    +      // Number of edges loaded so far (if in input superstep)
    +      long edgesLoaded = oocEngine.getSuperstep() >= 0 ? 0 :
    +        EdgeInputSplitsCallable.getTotalEdgesLoadedMeter().count();
    +      // Number of vertices loaded so far (if in input superstep)
    +      long verticesLoaded = oocEngine.getSuperstep() >= 0 ? 0 :
    +        VertexInputSplitsCallable.getTotalVerticesLoadedMeter().count();
    +      // Number of vertices computed (if either in compute or store phase)
    +      long verticesComputed = WorkerProgress.get().getVerticesComputed() +
    +        WorkerProgress.get().getVerticesStored() +
    +        AbstractEdgeStore.PROGRESS_COUNTER.getProgress();
    +      // Number of bytes received
    +      long receivedBytes =
    +        oocEngine.getNetworkMetrics().getBytesReceivedPerSuperstep();
    +      // Number of OOC bytes
    +      long oocBytes = oocBytesInjected.get();
    +
    +      memoryEstimator.addRecord(getOldGenUsed().getUsed(), edgesLoaded,
    +        verticesLoaded, verticesComputed, receivedBytes, oocBytes);
    +
    +      long garbage = before.getUsed() - after.getUsed();
    +      long maxMem = after.getMax();
    +      long memUsed = after.getUsed();
    +      boolean isTight = (maxMem - memUsed) < 2 * gcReclaimFraction * maxMem &&
    +        garbage < gcReclaimFraction * maxMem;
    +      boolean predictionExist = memoryEstimator.getUsageEstimate() > 0;
    +      if (isTight && !predictionExist) {
    +        if (LOG.isInfoEnabled()) {
    +          LOG.info("gcCompleted: garbage=" + garbage + " memUsed=" +
    +            memUsed + " maxMem=" + maxMem);
    +        }
    +        numBytesToOffload.set((long) (2 * gcReclaimFraction * maxMem) -
    +          (maxMem - memUsed));
    +        if (LOG.isInfoEnabled()) {
    +          LOG.info("gcCompleted: tight memory usage. Starting to offload " +
    +            "until " + numBytesToOffload.get() + " bytes are offloaded");
    +        }
    +        state = State.OFFLOADING;
    +        updateRates(1, 1);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Given an estimate for the current memory usage and the maximum available
    +   * memory, it updates the active threads and flow control credit in the
    +   * OOC engine.
    +   *
    +   * @param usageEstimateMem Estimate of memory usage.
    +   * @param maxMemory Maximum memory.
    +   */
    +  private void updateRates(long usageEstimateMem, long maxMemory) {
    +    double usageEstimate = (double) usageEstimateMem / maxMemory;
    +    if (usageEstimate > 0) {
    +      if (usageEstimate >= amHighThreshold) {
    +        oocEngine.updateActiveThreadsFraction(0);
    +      } else if (usageEstimate < amLowThreshold) {
    +        oocEngine.updateActiveThreadsFraction(1);
    +      } else {
    +        oocEngine.updateActiveThreadsFraction(1 -
    +          (usageEstimate - amLowThreshold) /
    +            (amHighThreshold - amLowThreshold));
    +      }
    +
    +      if (usageEstimate >= creditHighThreshold) {
    +        oocEngine.updateRequestsCreditFraction(0);
    +      } else if (usageEstimate < creditLowThreshold) {
    +        oocEngine.updateRequestsCreditFraction(1);
    +      } else {
    +        oocEngine.updateRequestsCreditFraction(1 -
    +          (usageEstimate - creditLowThreshold) /
    +            (creditHighThreshold - creditLowThreshold));
    +      }
    +    } else {
    +      oocEngine.updateActiveThreadsFraction(1);
    +      oocEngine.updateRequestsCreditFraction(1);
    +    }
    +  }
    +
    +  /**
    +   * Returns statistics about the old gen pool.
    +   * @return {@link MemoryUsage}.
    +   */
    +  private MemoryUsage getOldGenUsed() {
    +    List<MemoryPoolMXBean> memoryPoolList =
    +      ManagementFactory.getMemoryPoolMXBeans();
    +    for (MemoryPoolMXBean pool : memoryPoolList) {
    +      String normalName = pool.getName().toLowerCase();
    +      if (normalName.contains("old") || normalName.contains("tenured")) {
    +        return pool.getUsage();
    +      }
    +    }
    +    throw new IllegalStateException("Bad Memory Pool");
    +  }
    +
    +  /**
    +   * Maintains statistics about the current state and progress of the
    +   * computation and produces estimates of memory usage using a technique
    +   * based on linear regression.
    +   *
    +   * Upon a GC events, it gets updated with the most recent statistics through
    +   * the {@link #addRecord} method.
    +   */
    +  private static class MemoryEstimator {
    +    /** Stores the (x1,x2,...,x5) arrays of data samples, one for each sample */
    +    private Vector<double[]> dataSamples = new Vector<>();
    +    /** Stores the y memory usage dataSamples, one for each sample */
    +    private Vector<Double> memorySamples = new Vector<>();
    --- End diff --
    
    `DoubleArrayList` from fastutil is useful when we have a lot of entry in the list. The
number of entries in our case is very small (the number of full GCs in one superstep, which
is in the range of 1-100, getting closer to 1000 in the very extreme cases). So, the choice
of either `Vector` or `ArrayList` is more natural. I agree that we are not using the synchronized
guarantees of `Vector`, hence using `ArrayList` is an even more natural choice. I'll change
these to `ArrayList`. Conversion from `ArrayList` (or even `Vector`) to primitive array can
be done through `toArray` method in either of the collections. I'll fix that too.


> Add memory estimation mechanism to out-of-core
> ----------------------------------------------
>
>                 Key: GIRAPH-1125
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-1125
>             Project: Giraph
>          Issue Type: Improvement
>            Reporter: Hassan Eslami
>            Assignee: Hassan Eslami
>
> The new out-of-core mechanism is designed with the adaptivity goal in mind, meaning that
we wanted out-of-core mechanism to kick in only when it is necessary. In other words, when
the amount of data (graph, messages, and mutations) all fit in memory, we want to take advantage
of the entire memory. And, when in a stage the memory is short, only enough (minimal) amount
of data goes out of core (to disk). This ensures a good performance for the out-of-core mechanism.
> To satisfy the adaptiveness goal, we need to know how much memory is used at each point
of time. The default out-of-core mechanism (ThresholdBasedOracle) get memory information based
on JVM's internal methods (Runtime's freeMemory()). This method is inaccurate (and pessimistic),
meaning that it does not account for garbage data that has not been purged by GC. Using JVM's
default methods, OOC behaves pessimistically and move data out of core even if it is not necessary.
For instance, consider the case where there are a lot of garbage on the heap, but GC has not
happened for a while. In this case, the default OOC pushes data on disk and immediately after
a major GC it brings back the data to memory. This causes inefficiency in the default out
of core mechanism. If out-of-core is used but the data can entirely fit in memory, the job
goes out of core even though going out of core is not necessary.
> To address this issue, we need to have a mechanism to more accurately know how much of
heap is filled with non-garbage data. Consequently, we need to change the Oracle (OOC policy)
to take advantage of a more accurate memory usage estimation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message