Repository: giraph
Updated Branches:
refs/heads/trunk cdf37f23a -> f5b685efa
GIRAPH-1125
Closes #12
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f5b685ef
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f5b685ef
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f5b685ef
Branch: refs/heads/trunk
Commit: f5b685efa09b539b1f95925405723f7ac7b1dcea
Parents: cdf37f2
Author: Hassan Eslami <heslami@apache.org>
Authored: Fri Dec 23 12:03:37 2016 -0600
Committer: Hassan Eslami <hassan@wirelessprv-10-193-225-240.near.illinois.edu>
Committed: Fri Dec 23 12:03:37 2016 -0600
----------------------------------------------------------------------
giraph-core/pom.xml | 4 +
.../java/org/apache/giraph/comm/ServerData.java | 4 +-
.../org/apache/giraph/comm/WorkerServer.java | 2 +-
.../flow_control/CreditBasedFlowControl.java | 122 +++++---------
.../giraph/comm/flow_control/FlowControl.java | 5 -
.../comm/flow_control/NoOpFlowControl.java | 3 -
.../comm/flow_control/StaticFlowControl.java | 3 -
.../giraph/comm/netty/ByteCounterDelegate.java | 20 ++-
.../giraph/comm/netty/InboundByteCounter.java | 15 ++
.../apache/giraph/comm/netty/NettyClient.java | 3 +-
.../apache/giraph/comm/netty/NettyServer.java | 10 +-
.../giraph/comm/netty/NettyWorkerServer.java | 17 +-
.../org/apache/giraph/conf/GiraphConstants.java | 4 +-
.../apache/giraph/edge/AbstractEdgeStore.java | 11 +-
.../org/apache/giraph/ooc/OutOfCoreEngine.java | 64 +++++--
.../giraph/ooc/data/MetaPartitionManager.java | 13 +-
.../ooc/persistence/LocalDiskDataAccessor.java | 13 +-
.../ooc/policy/FixedPartitionsOracle.java | 3 +-
.../giraph/ooc/policy/OutOfCoreOracle.java | 5 +-
.../ooc/policy/SimpleGCMonitoringOracle.java | 7 +-
.../giraph/ooc/policy/ThresholdBasedOracle.java | 165 ++++++-------------
.../apache/giraph/worker/BspServiceWorker.java | 1 +
.../giraph/partition/TestPartitionStores.java | 10 +-
.../java/org/apache/giraph/utils/MockUtils.java | 4 +-
pom.xml | 5 +
25 files changed, 263 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-core/pom.xml b/giraph-core/pom.xml
index 2a943f6..9a5d890 100644
--- a/giraph-core/pom.xml
+++ b/giraph-core/pom.xml
@@ -503,6 +503,10 @@ under the License.
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math</artifactId>
+ </dependency>
+ <dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index e926b6c..6f20e39 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -131,11 +131,13 @@ public class ServerData<I extends WritableComparable,
* Constructor.
*
* @param service Service worker
+ * @param workerServer Worker server
* @param conf Configuration
* @param context Mapper context
*/
public ServerData(
CentralizedServiceWorker<I, V, E> service,
+ WorkerServer workerServer,
ImmutableClassesGiraphConfiguration<I, V, E> conf,
Mapper<?, ?, ?, ?>.Context context) {
this.serviceWorker = service;
@@ -147,7 +149,7 @@ public class ServerData<I extends WritableComparable,
PartitionStore<I, V, E> inMemoryPartitionStore =
new SimplePartitionStore<I, V, E>(conf, context);
if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
- oocEngine = new OutOfCoreEngine(conf, service);
+ oocEngine = new OutOfCoreEngine(conf, service, workerServer);
partitionStore =
new DiskBackedPartitionStore<I, V, E>(inMemoryPartitionStore,
conf, context, oocEngine);
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
index efd9421..fdb4b08 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
@@ -35,7 +35,7 @@ import java.net.InetSocketAddress;
@SuppressWarnings("rawtypes")
public interface WorkerServer<I extends WritableComparable,
V extends Writable, E extends Writable>
- extends Closeable {
+ extends NetworkMetrics, Closeable {
/**
* Get server address
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
index 0e20eee..18cf017 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
@@ -20,6 +20,19 @@ package org.apache.giraph.comm.flow_control;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.comm.netty.handler.AckSignalFlag;
+import org.apache.giraph.comm.requests.SendResumeRequest;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.utils.AdjustableSemaphore;
+import org.apache.log4j.Logger;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -28,34 +41,11 @@ import java.util.Comparator;
import java.util.Deque;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.giraph.comm.netty.NettyClient;
-import org.apache.giraph.comm.netty.handler.AckSignalFlag;
-import org.apache.giraph.comm.requests.SendResumeRequest;
-import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.IntConfOption;
-import org.apache.giraph.utils.AdjustableSemaphore;
-import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.utils.LogStacktraceCallable;
-import org.apache.giraph.utils.ThreadUtils;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
/**
* Representation of credit-based flow control policy. With this policy there
* can be limited number of open requests from any worker x to any other worker
@@ -182,20 +172,17 @@ public class CreditBasedFlowControl implements FlowControl {
private final Semaphore unsentRequestPermit;
/** Netty client used for sending requests */
private final NettyClient nettyClient;
- /**
- * Result of execution for the thread responsible for sending resume signals
- */
- private final Future<Void> resumeThreadResult;
- /** Whether we are shutting down the execution */
- private volatile boolean shouldTerminate;
/**
* Constructor
* @param conf configuration
* @param nettyClient netty client
+ * @param exceptionHandler Exception handler
*/
public CreditBasedFlowControl(ImmutableClassesGiraphConfiguration conf,
- NettyClient nettyClient) {
+ NettyClient nettyClient,
+ Thread.UncaughtExceptionHandler
+ exceptionHandler) {
this.nettyClient = nettyClient;
maxOpenRequestsPerWorker =
(short) MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
@@ -205,45 +192,33 @@ public class CreditBasedFlowControl implements FlowControl {
unsentRequestPermit = new Semaphore(MAX_NUM_OF_UNSENT_REQUESTS.get(conf));
unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf);
waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
- shouldTerminate = false;
- CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+ Thread thread = new Thread(new Runnable() {
@Override
- public Callable<Void> newCallable(int callableId) {
- return new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- while (true) {
- synchronized (workersToResume) {
- if (shouldTerminate) {
- break;
- }
- for (Integer workerId : workersToResume) {
- if (maxOpenRequestsPerWorker != 0) {
- sendResumeSignal(workerId);
- } else {
- break;
- }
- }
- try {
- workersToResume.wait();
- } catch (InterruptedException e) {
- throw new IllegalStateException("call: caught exception " +
- "while waiting for resume-sender thread to be notified!",
- e);
- }
+ public void run() {
+ while (true) {
+ synchronized (workersToResume) {
+ for (Integer workerId : workersToResume) {
+ if (maxOpenRequestsPerWorker != 0) {
+ sendResumeSignal(workerId);
+ } else {
+ break;
}
}
- return null;
+ try {
+ workersToResume.wait();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("run: caught exception " +
+ "while waiting for resume-sender thread to be notified!",
+ e);
+ }
}
- };
+ }
}
- };
-
- ExecutorService executor = Executors.newSingleThreadExecutor(
- ThreadUtils.createThreadFactory("resume-sender"));
- resumeThreadResult = executor.submit(new LogStacktraceCallable<>(
- callableFactory.newCallable(0)));
- executor.shutdown();
+ });
+ thread.setUncaughtExceptionHandler(exceptionHandler);
+ thread.setName("resume-sender");
+ thread.setDaemon(true);
+ thread.start();
}
/**
@@ -252,6 +227,11 @@ public class CreditBasedFlowControl implements FlowControl {
* @param workerId id of the worker to send the resume signal to
*/
private void sendResumeSignal(int workerId) {
+ if (maxOpenRequestsPerWorker == 0) {
+ LOG.warn("sendResumeSignal: method called while the max open requests " +
+ "for worker " + workerId + " is still 0");
+ return;
+ }
WritableRequest request = new SendResumeRequest(maxOpenRequestsPerWorker);
Long resumeId = nettyClient.doSend(workerId, request);
checkState(resumeId != null);
@@ -407,20 +387,6 @@ public class CreditBasedFlowControl implements FlowControl {
}
@Override
- public void shutdown() {
- synchronized (workersToResume) {
- shouldTerminate = true;
- workersToResume.notifyAll();
- }
- try {
- resumeThreadResult.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new IllegalStateException("shutdown: caught exception while" +
- "getting result of resume-sender thread");
- }
- }
-
- @Override
public void logInfo() {
if (LOG.isInfoEnabled()) {
// Count how many unsent requests each task has
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
index 4072af7..b723a01 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
@@ -83,11 +83,6 @@ public interface FlowControl {
int calculateResponse(AckSignalFlag flag, int taskId);
/**
- * Shutdown the flow control policy
- */
- void shutdown();
-
- /**
* Log the status of the flow control
*/
void logInfo();
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
index c97c967..fdd6035 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
@@ -66,8 +66,5 @@ public class NoOpFlowControl implements FlowControl {
}
@Override
- public void shutdown() { }
-
- @Override
public void logInfo() { }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
index 6d67afd..332639c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
@@ -155,9 +155,6 @@ public class StaticFlowControl implements
}
@Override
- public void shutdown() { }
-
- @Override
public void logInfo() {
if (LOG.isInfoEnabled()) {
LOG.info("logInfo: " + numWaitingThreads.get() + " threads waiting " +
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounterDelegate.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounterDelegate.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounterDelegate.java
index 710ec9d..32f414d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounterDelegate.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounterDelegate.java
@@ -44,8 +44,10 @@ public class ByteCounterDelegate implements ByteCounter {
new DecimalFormat("#######.####");
/** Class timer */
private static final Time TIME = SystemTime.get();
- /** All bytes ever processed */
+ /** Bytes processed during the most recent time interval */
private final AtomicLong bytesProcessed = new AtomicLong();
+ /** Aggregate bytes per superstep */
+ private final AtomicLong bytesProcessedPerSuperstep = new AtomicLong();
/** Total processed requests */
private final AtomicLong processedRequests = new AtomicLong();
/** Start time (for bandwidth calculation) */
@@ -99,6 +101,7 @@ public class ByteCounterDelegate implements ByteCounter {
public int byteBookkeeper(ByteBuf buf) {
int processedBytes = buf.readableBytes();
bytesProcessed.addAndGet(processedBytes);
+ bytesProcessedPerSuperstep.addAndGet(processedBytes);
processedBytesHist.update(processedBytes);
processedRequests.incrementAndGet();
processedRequestsMeter.mark();
@@ -126,6 +129,21 @@ public class ByteCounterDelegate implements ByteCounter {
resetStartMsecs();
}
+ /**
+ * Returns bytes processed per superstep.
+ * @return Number of bytes.
+ */
+ public long getBytesProcessedPerSuperstep() {
+ return bytesProcessedPerSuperstep.get();
+ }
+
+ /**
+ * Set bytes processed per superstep to 0.
+ */
+ public void resetBytesProcessedPerSuperstep() {
+ bytesProcessedPerSuperstep.set(0);
+ }
+
public long getBytesProcessed() {
return bytesProcessed.get();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java
index 44b9c5d..7f616fb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java
@@ -53,6 +53,21 @@ public class InboundByteCounter extends ChannelInboundHandlerAdapter implements
}
/**
+ * Returns bytes received per superstep.
+ * @return Number of bytes.
+ */
+ public long getBytesReceivedPerSuperstep() {
+ return delegate.getBytesProcessedPerSuperstep();
+ }
+
+ /**
+ * Set bytes received per superstep to 0.
+ */
+ public void resetBytesReceivedPerSuperstep() {
+ delegate.resetBytesProcessedPerSuperstep();
+ }
+
+ /**
* @return Mbytes received / sec in the current interval
*/
public double getMbytesPerSecReceived() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 541ce93..7b751ed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -237,7 +237,7 @@ public class NettyClient {
if (limitNumberOfOpenRequests) {
flowControl = new StaticFlowControl(conf, this);
} else if (limitOpenRequestsPerWorker) {
- flowControl = new CreditBasedFlowControl(conf, this);
+ flowControl = new CreditBasedFlowControl(conf, this, exceptionHandler);
} else {
flowControl = new NoOpFlowControl(this);
}
@@ -644,7 +644,6 @@ public class NettyClient {
if (LOG.isInfoEnabled()) {
LOG.info("stop: Halting netty client");
}
- flowControl.shutdown();
// Close connections asynchronously, in a Netty-approved
// way, without cleaning up thread pools until all channels
// in addressChannelMap are closed (success or failure)
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
index a461bdd..dabd175 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
@@ -71,7 +71,6 @@ public class NettyServer {
/** Default maximum thread pool size */
public static final int MAXIMUM_THREAD_POOL_SIZE_DEFAULT = 32;
-
/*if_not[HADOOP_NON_SECURE]*/
/** Used to authenticate with netty clients */
public static final AttributeKey<SaslNettyServer>
@@ -128,6 +127,7 @@ public class NettyServer {
/** Handles all uncaught exceptions in netty threads */
private final Thread.UncaughtExceptionHandler exceptionHandler;
+
/**
* Constructor for creating the server
*
@@ -217,6 +217,14 @@ public class NettyServer {
/*end[HADOOP_NON_SECURE]*/
/**
+ * Returns a handle on the in-bound byte counter.
+ * @return The {@link InboundByteCounter} object for this server.
+ */
+ public InboundByteCounter getInByteCounter() {
+ return inByteCounter;
+ }
+
+ /**
* Start the server with the appropriate port
*/
public void start() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index befce5f..57b6873 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -74,7 +74,7 @@ public class NettyWorkerServer<I extends WritableComparable,
this.context = context;
serverData =
- new ServerData<I, V, E>(service, conf, context);
+ new ServerData<I, V, E>(service, this, conf, context);
nettyServer = new NettyServer(conf,
new WorkerRequestServerHandler.Factory<I, V, E>(serverData),
@@ -111,4 +111,19 @@ public class NettyWorkerServer<I extends WritableComparable,
public void setFlowControl(FlowControl flowControl) {
nettyServer.setFlowControl(flowControl);
}
+
+ @Override
+ public long getBytesReceivedPerSuperstep() {
+ return nettyServer.getInByteCounter().getBytesReceivedPerSuperstep();
+ }
+
+ @Override
+ public void resetBytesReceivedPerSuperstep() {
+ nettyServer.getInByteCounter().resetBytesReceivedPerSuperstep();
+ }
+
+ @Override
+ public long getBytesReceived() {
+ return nettyServer.getInByteCounter().getBytesReceived();
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 14b8ddd..c7b04d8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -75,8 +75,8 @@ import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
import org.apache.giraph.ooc.persistence.LocalDiskDataAccessor;
+import org.apache.giraph.ooc.policy.MemoryEstimatorOracle;
import org.apache.giraph.ooc.policy.OutOfCoreOracle;
-import org.apache.giraph.ooc.policy.ThresholdBasedOracle;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.HashPartitionerFactory;
import org.apache.giraph.partition.Partition;
@@ -986,7 +986,7 @@ public interface GiraphConstants {
*/
ClassConfOption<OutOfCoreOracle> OUT_OF_CORE_ORACLE =
ClassConfOption.create("giraph.outOfCoreOracle",
- ThresholdBasedOracle.class, OutOfCoreOracle.class,
+ MemoryEstimatorOracle.class, OutOfCoreOracle.class,
"Out-of-core oracle that is to be used for adaptive out-of-core " +
"engine");
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
index d2e7e8d..3b211fb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
@@ -27,7 +27,9 @@ import org.apache.giraph.graph.Vertex;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.ProgressCounter;
import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.ThreadLocalProgressCounter;
import org.apache.giraph.utils.Trimmable;
import org.apache.giraph.utils.VertexIdEdgeIterator;
import org.apache.giraph.utils.VertexIdEdges;
@@ -60,6 +62,9 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
V extends Writable, E extends Writable, K, Et>
extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
implements EdgeStore<I, V, E> {
+ /** Used to keep track of progress during the move-edges process */
+ public static final ThreadLocalProgressCounter PROGRESS_COUNTER =
+ new ThreadLocalProgressCounter();
/** Class logger */
private static final Logger LOG = Logger.getLogger(AbstractEdgeStore.class);
/** Service worker. */
@@ -81,10 +86,11 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
*/
protected boolean useInputOutEdges;
/** Whether we spilled edges on disk */
- private boolean hasEdgesOnDisk = false;
+ private volatile boolean hasEdgesOnDisk = false;
/** Create source vertices */
private CreateSourceVertexCallback<I> createSourceVertexCallback;
+
/**
* Constructor.
*
@@ -274,12 +280,12 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
return new Callable<Void>() {
@Override
public Void call() throws Exception {
- Integer partitionId;
I representativeVertexId = configuration.createVertexId();
OutOfCoreEngine oocEngine = service.getServerData().getOocEngine();
if (oocEngine != null) {
oocEngine.processingThreadStart();
}
+ ProgressCounter numVerticesProcessed = PROGRESS_COUNTER.get();
while (true) {
Partition<I, V, E> partition =
service.getPartitionStore().getNextPartition();
@@ -338,6 +344,7 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
// require us to put back the vertex after modifying it.
partition.saveVertex(vertex);
}
+ numVerticesProcessed.inc();
iterator.remove();
}
// Some PartitionStore implementations
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
index 65399b2..82a55f1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
@@ -22,7 +22,9 @@ import com.sun.management.GarbageCollectionNotificationInfo;
import com.yammer.metrics.core.Gauge;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.NetworkMetrics;
import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
import org.apache.giraph.comm.flow_control.FlowControl;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -101,6 +103,11 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
/** Semaphore used for controlling number of active threads at each moment */
private final AdjustableSemaphore activeThreadsPermit;
/**
+ * Cached value for NettyClient.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER (max
+ * credit used for credit-based flow-control mechanism)
+ */
+ private final short maxRequestsCredit;
+ /**
* Generally, the logic in Giraph for change of the superstep happens in the
* following order:
* (1) Compute threads are done processing all partitions
@@ -141,14 +148,23 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
private boolean resetDone;
/**
+ * Provides statistics about network traffic (e.g. received bytes per
+ * superstep etc).
+ */
+ private final NetworkMetrics networkMetrics;
+
+ /**
* Constructor
*
* @param conf Configuration
* @param service Service worker
+ * @param networkMetrics Interface for network stats
*/
public OutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
- CentralizedServiceWorker<?, ?, ?> service) {
+ CentralizedServiceWorker<?, ?, ?> service,
+ NetworkMetrics networkMetrics) {
this.service = service;
+ this.networkMetrics = networkMetrics;
Class<? extends OutOfCoreDataAccessor> accessorClass =
GiraphConstants.OUT_OF_CORE_DATA_ACCESSOR.get(conf);
try {
@@ -178,22 +194,24 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
"out-of-core policy. Setting the oracle to be FixedPartitionsOracle");
oracleClass = FixedPartitionsOracle.class;
}
- try {
- Constructor<?> constructor = oracleClass.getConstructor(
- ImmutableClassesGiraphConfiguration.class, OutOfCoreEngine.class);
- this.oracle = (OutOfCoreOracle) constructor.newInstance(conf, this);
- } catch (NoSuchMethodException | IllegalAccessException |
- InstantiationException | InvocationTargetException e) {
- throw new IllegalStateException("OutOfCoreEngine: caught exception " +
- "while creating the oracle!", e);
- }
this.numComputeThreads = conf.getNumComputeThreads();
// At the beginning of the execution, only input threads are processing data
this.numProcessingThreads = conf.getNumInputSplitsThreads();
this.activeThreadsPermit = new AdjustableSemaphore(numProcessingThreads);
+ this.maxRequestsCredit = (short)
+ CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
this.superstep = BspService.INPUT_SUPERSTEP;
this.resetDone = false;
GiraphMetrics.get().addSuperstepResetObserver(this);
+ try {
+ Constructor<?> constructor = oracleClass.getConstructor(
+ ImmutableClassesGiraphConfiguration.class, OutOfCoreEngine.class);
+ this.oracle = (OutOfCoreOracle) constructor.newInstance(conf, this);
+ } catch (NoSuchMethodException | IllegalAccessException |
+ InstantiationException | InvocationTargetException e) {
+ throw new IllegalStateException("OutOfCoreEngine: caught exception " +
+ "while creating the oracle!", e);
+ }
}
/**
@@ -332,6 +350,7 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
"UL_UNRELEASED_LOCK_EXCEPTION_PATH")
public void startIteration() {
+ oracle.startIteration();
if (!resetDone) {
superstepLock.writeLock().lock();
metaPartitionManager.resetPartitions();
@@ -460,6 +479,27 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
}
/**
+ * Update the credit announced for this worker in Netty. The lower the credit
+ * is, the lower rate incoming messages arrive at this worker. Thus, credit
+ * is an indirect way of controlling amount of memory incoming messages would
+ * take.
+ *
+ * @param fraction the fraction of max credits others can use to send requests
+ * to this worker
+ */
+ public void updateRequestsCreditFraction(double fraction) {
+ checkState(fraction >= 0 && fraction <= 1);
+ short newCredit = (short) (maxRequestsCredit * fraction);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("updateRequestsCreditFraction: updating the credit to " +
+ newCredit);
+ }
+ if (flowControl != null) {
+ ((CreditBasedFlowControl) flowControl).updateCredit(newCredit);
+ }
+ }
+
+ /**
* Reset partitions and messages meta data. Also, reset the cached value of
* superstep counter.
*/
@@ -507,4 +547,8 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
public OutOfCoreDataAccessor getDataAccessor() {
return dataAccessor;
}
+
+ public NetworkMetrics getNetworkMetrics() {
+ return networkMetrics;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
index 173b451..32a5a6e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
@@ -526,8 +526,8 @@ public class MetaPartitionManager {
meta = perThreadPartitionDictionary.get(threadId).lookup(
ProcessingState.PROCESSED,
StorageState.IN_MEM,
- StorageState.ON_DISK,
- null);
+ null,
+ StorageState.ON_DISK);
if (meta != null) {
return meta.getPartitionId();
}
@@ -535,18 +535,17 @@ public class MetaPartitionManager {
meta = perThreadPartitionDictionary.get(threadId).lookup(
ProcessingState.PROCESSED,
StorageState.ON_DISK,
- StorageState.IN_MEM,
- null);
+ null,
+ StorageState.IN_MEM);
if (meta != null) {
return meta.getPartitionId();
}
- // Forth, look for a processed partition entirely on disk
meta = perThreadPartitionDictionary.get(threadId).lookup(
ProcessingState.PROCESSED,
StorageState.ON_DISK,
- StorageState.ON_DISK,
- null);
+ null,
+ StorageState.ON_DISK);
if (meta != null) {
return meta.getPartitionId();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
index 8efa9de..f189def 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
@@ -94,12 +94,11 @@ public class LocalDiskDataAccessor implements OutOfCoreDataAccessor {
int ptr = 0;
String jobId = conf.getJobId();
for (String path : userPaths) {
- File file = new File(path);
- if (!file.exists()) {
- checkState(file.mkdirs(), "LocalDiskDataAccessor: cannot create " +
- "directory " + file.getAbsolutePath());
- }
- basePaths[ptr] = path + "/" + jobId;
+ String jobDirectory = path + "/" + jobId;
+ File file = new File(jobDirectory);
+ checkState(file.mkdirs(), "LocalDiskDataAccessor: cannot create " +
+ "directory " + file.getAbsolutePath());
+ basePaths[ptr] = jobDirectory + "/";
ptr++;
}
final int diskBufferSize = OOC_DISK_BUFFER_SIZE.get(conf);
@@ -112,7 +111,7 @@ public class LocalDiskDataAccessor implements OutOfCoreDataAccessor {
@Override
public void shutdown() {
for (String path : basePaths) {
- File file = new File(path).getParentFile();
+ File file = new File(path);
for (String subFileName : file.list()) {
File subFile = new File(file.getPath(), subFileName);
checkState(subFile.delete(), "shutdown: cannot delete file %s",
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
index da21973..dbce6a6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
@@ -145,5 +145,6 @@ public class FixedPartitionsOracle implements OutOfCoreOracle {
public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { }
@Override
- public void shutdown() { }
+ public void startIteration() {
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java
index 45b9914..ae54a91 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java
@@ -128,8 +128,7 @@ public interface OutOfCoreOracle {
void gcCompleted(GarbageCollectionNotificationInfo gcInfo);
/**
- * Shut down the out-of-core oracle. Necessary specifically for cases where
- * out-of-core oracle is using additional monitoring threads.
+ * Called at the beginning of a superstep.
*/
- void shutdown();
+ void startIteration();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java
index 477b3ec..2bea697 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java
@@ -141,6 +141,10 @@ public class SimpleGCMonitoringOracle implements OutOfCoreOracle {
lastGCObservation = observation;
}
+ @Override
+ public void startIteration() {
+ }
+
/**
* Get the current data injection rate to memory based on the commands ran
* in the history (retrieved from statistics collector), and outstanding
@@ -274,9 +278,6 @@ public class SimpleGCMonitoringOracle implements OutOfCoreOracle {
commandOccurrences.get(command.getType()).getAndDecrement();
}
- @Override
- public void shutdown() { }
-
/** Helper class to record memory status after GC calls */
private class GCObservation {
/** The time at which the GC happened (in milliseconds) */
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
index 00a8011..2dd2c10 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
@@ -19,28 +19,15 @@
package org.apache.giraph.ooc.policy;
import com.sun.management.GarbageCollectionNotificationInfo;
-import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
-import org.apache.giraph.comm.flow_control.FlowControl;
import org.apache.giraph.comm.netty.NettyClient;
import org.apache.giraph.conf.FloatConfOption;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.LongConfOption;
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.ooc.command.IOCommand;
-import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.utils.LogStacktraceCallable;
import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.utils.ThreadUtils;
import org.apache.log4j.Logger;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
import static com.google.common.base.Preconditions.checkState;
/**
@@ -89,7 +76,7 @@ import static com.google.common.base.Preconditions.checkState;
public class ThresholdBasedOracle implements OutOfCoreOracle {
/** The memory pressure at/above which the job would fail */
public static final FloatConfOption FAIL_MEMORY_PRESSURE =
- new FloatConfOption("giraph.memory.failPressure", 0.975f,
+ new FloatConfOption("giraph.threshold.failPressure", 0.975f,
"The memory pressure (fraction of used memory) at/above which the " +
"job would fail.");
/**
@@ -98,13 +85,13 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
* job processing rate.
*/
public static final FloatConfOption EMERGENCY_MEMORY_PRESSURE =
- new FloatConfOption("giraph.memory.emergencyPressure", 0.925f,
+ new FloatConfOption("giraph.threshold.emergencyPressure", 0.925f,
"The memory pressure (fraction of used memory) at which the job " +
"is close to fail, hence we should reduce its processing rate " +
"as much as possible.");
/** The memory pressure at which the job is suffering from GC overhead. */
public static final FloatConfOption HIGH_MEMORY_PRESSURE =
- new FloatConfOption("giraph.memory.highPressure", 0.875f,
+ new FloatConfOption("giraph.threshold.highPressure", 0.875f,
"The memory pressure (fraction of used memory) at which the job " +
"is suffering from GC overhead.");
/**
@@ -112,7 +99,7 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
* memory intensive job.
*/
public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
- new FloatConfOption("giraph.memory.optimalPressure", 0.8f,
+ new FloatConfOption("giraph.threshold.optimalPressure", 0.8f,
"The memory pressure (fraction of used memory) at which a " +
"memory-intensive job shows the optimal GC behavior.");
/**
@@ -120,12 +107,12 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
* suffering from GC overhead.
*/
public static final FloatConfOption LOW_MEMORY_PRESSURE =
- new FloatConfOption("giraph.memory.lowPressure", 0.7f,
+ new FloatConfOption("giraph.threshold.lowPressure", 0.7f,
"The memory pressure (fraction of used memory) at/below which the " +
"job can use more memory without suffering the performance.");
/** The interval at which memory observer thread wakes up. */
public static final LongConfOption CHECK_MEMORY_INTERVAL =
- new LongConfOption("giraph.checkMemoryInterval", 2500,
+ new LongConfOption("giraph.threshold.checkMemoryInterval", 2500,
"The interval/period where memory observer thread wakes up and " +
"monitors memory footprint (in milliseconds)");
/**
@@ -134,7 +121,7 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
* past is specified in this parameter
*/
public static final LongConfOption LAST_GC_CALL_INTERVAL =
- new LongConfOption("giraph.lastGcCallInterval", 10 * 1000,
+ new LongConfOption("giraph.threshold.lastGcCallInterval", 10 * 1000,
"How long after last major/full GC should we call manual GC?");
/** Class logger */
@@ -154,18 +141,6 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
private final long checkMemoryInterval;
/** Cached value for LAST_GC_CALL_INTERVAL */
private final long lastGCCallInterval;
- /**
- * Cached value for NettyClient.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER (max
- * credit used for credit-based flow-control mechanism)
- */
- private final short maxRequestsCredit;
- /**
- * Whether the job is shutting down. Used for terminating the memory
- * observer thread.
- */
- private final CountDownLatch shouldTerminate;
- /** Result of memory observer thread */
- private final Future<Void> checkMemoryThreadResult;
/** Out-of-core engine */
private final OutOfCoreEngine oocEngine;
/** Last time a major/full GC has been called (in milliseconds) */
@@ -188,70 +163,59 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
this.lowMemoryPressure = LOW_MEMORY_PRESSURE.get(conf);
this.checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
this.lastGCCallInterval = LAST_GC_CALL_INTERVAL.get(conf);
- this.maxRequestsCredit = (short)
- CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.setIfUnset(conf, true);
boolean useCredit = NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
checkState(useCredit, "ThresholdBasedOracle: credit-based flow control " +
"must be enabled. Use giraph.waitForPerWorkerRequests=true");
- this.shouldTerminate = new CountDownLatch(1);
this.oocEngine = oocEngine;
this.lastMajorGCTime = 0;
- CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+ final Thread thread = new Thread(new Runnable() {
@Override
- public Callable<Void> newCallable(int callableId) {
- return new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- while (true) {
- boolean done = shouldTerminate.await(checkMemoryInterval,
- TimeUnit.MILLISECONDS);
- if (done) {
- break;
- }
- double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
- long time = System.currentTimeMillis();
- if ((usedMemoryFraction > highMemoryPressure &&
- time - lastMajorGCTime >= lastGCCallInterval) ||
- (usedMemoryFraction > optimalMemoryPressure &&
+ public void run() {
+ while (true) {
+ double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
+ long time = System.currentTimeMillis();
+ if ((usedMemoryFraction > highMemoryPressure &&
+ time - lastMajorGCTime >= lastGCCallInterval) ||
+ (usedMemoryFraction > optimalMemoryPressure &&
time - lastMajorGCTime >= lastGCCallInterval &&
time - lastMinorGCTime >= lastGCCallInterval)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("call: last GC happened a while ago and the " +
- "amount of used memory is high (used memory " +
- "fraction is " +
- String.format("%.2f", usedMemoryFraction) + "). " +
- "Calling GC manually");
- }
- System.gc();
- time = System.currentTimeMillis() - time;
- usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
- if (LOG.isInfoEnabled()) {
- LOG.info("call: manual GC is done. It took " +
- String.format("%.2f", (double) time / 1000) +
- " seconds. Used memory fraction is " +
- String.format("%.2f", usedMemoryFraction));
- }
- }
- updateRates(usedMemoryFraction);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("call: last GC happened a while ago and the " +
+ "amount of used memory is high (used memory " +
+ "fraction is " +
+ String.format("%.2f", usedMemoryFraction) + "). " +
+ "Calling GC manually");
}
- return null;
+ System.gc();
+ time = System.currentTimeMillis() - time;
+ usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("call: manual GC is done. It took " +
+ String.format("%.2f", (double) time / 1000) +
+ " seconds. Used memory fraction is " +
+ String.format("%.2f", usedMemoryFraction));
+ }
+ }
+ updateRates(usedMemoryFraction);
+ try {
+ Thread.sleep(checkMemoryInterval);
+ } catch (InterruptedException e) {
+ LOG.warn("run: exception occurred!", e);
+ return;
}
- };
+ }
}
- };
- ExecutorService executor = Executors.newSingleThreadExecutor(
- ThreadUtils.createThreadFactory("check-memory"));
- this.checkMemoryThreadResult = executor.submit(new LogStacktraceCallable<>(
- callableFactory.newCallable(0)));
- executor.shutdown();
+ });
+ thread.setUncaughtExceptionHandler(oocEngine.getServiceWorker()
+ .getGraphTaskManager().createUncaughtExceptionHandler());
+ thread.setName("memory-checker");
+ thread.setDaemon(true);
+ thread.start();
}
/**
- * upon major/full GC calls.
- */
- /**
* Update statistics and rate regarding communication credits and number of
* active threads.
*
@@ -272,13 +236,13 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
// Update the fraction of credit that should be used in credit-based flow-
// control
if (usedMemoryFraction >= emergencyMemoryPressure) {
- updateRequestsCredit((short) 0);
+ oocEngine.updateRequestsCreditFraction(0);
} else if (usedMemoryFraction < optimalMemoryPressure) {
- updateRequestsCredit(maxRequestsCredit);
+ oocEngine.updateRequestsCreditFraction(1);
} else {
- updateRequestsCredit((short) (maxRequestsCredit *
- (1 - (usedMemoryFraction - optimalMemoryPressure) /
- (emergencyMemoryPressure - optimalMemoryPressure))));
+ oocEngine.updateRequestsCreditFraction(1 -
+ (usedMemoryFraction - optimalMemoryPressure) /
+ (emergencyMemoryPressure - optimalMemoryPressure));
}
}
@@ -331,35 +295,6 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
}
@Override
- public void shutdown() {
- shouldTerminate.countDown();
- try {
- checkMemoryThreadResult.get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("shutdown: caught exception while waiting on check-memory " +
- "thread to terminate!");
- throw new IllegalStateException(e);
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("shutdown: ThresholdBasedOracle shutdown complete!");
- }
- }
-
- /**
- * Update the credit announced for this worker in Netty. The lower the credit
- * is, the lower rate incoming messages arrive at this worker. Thus, credit
- * is an indirect way of controlling amount of memory incoming messages would
- * take.
- *
- * @param newCredit the new credit to announce to other workers
- */
- private void updateRequestsCredit(short newCredit) {
- if (LOG.isInfoEnabled()) {
- LOG.info("updateRequestsCredit: updating the credit to " + newCredit);
- }
- FlowControl flowControl = oocEngine.getFlowControl();
- if (flowControl != null) {
- ((CreditBasedFlowControl) flowControl).updateCredit(newCredit);
- }
+ public void startIteration() {
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index c51521d..5b00eb7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -714,6 +714,7 @@ else[HADOOP_NON_SECURE]*/
workerInfoList.clear();
workerInfoList = addressesAndPartitions.getWorkerInfos();
masterInfo = addressesAndPartitions.getMasterInfo();
+ workerServer.resetBytesReceivedPerSuperstep();
if (LOG.isInfoEnabled()) {
LOG.info("startSuperstep: " + masterInfo);
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index 1e4593b..8f16c35 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.WorkerServer;
import org.apache.giraph.comm.netty.NettyClient;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
@@ -162,13 +163,14 @@ public class TestPartitionStores {
CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
+ WorkerServer workerServer = Mockito.mock(WorkerServer.class);
Mockito.when(serviceWorker.getSuperstep()).thenReturn(
BspService.INPUT_SUPERSTEP);
GraphTaskManager<IntWritable, IntWritable, NullWritable>
graphTaskManager = Mockito.mock(GraphTaskManager.class);
Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
ServerData<IntWritable, IntWritable, NullWritable>
- serverData = new ServerData<>(serviceWorker, conf, context);
+ serverData = new ServerData<>(serviceWorker, workerServer, conf, context);
Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
@@ -191,13 +193,14 @@ public class TestPartitionStores {
CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
+ WorkerServer workerServer = Mockito.mock(WorkerServer.class);
Mockito.when(serviceWorker.getSuperstep()).thenReturn(
BspService.INPUT_SUPERSTEP);
GraphTaskManager<IntWritable, IntWritable, NullWritable>
graphTaskManager = Mockito.mock(GraphTaskManager.class);
Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
ServerData<IntWritable, IntWritable, NullWritable>
- serverData = new ServerData<>(serviceWorker, conf, context);
+ serverData = new ServerData<>(serviceWorker, workerServer, conf, context);
Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
@@ -308,6 +311,7 @@ public class TestPartitionStores {
CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
+ WorkerServer workerServer = Mockito.mock(WorkerServer.class);
Mockito.when(serviceWorker.getSuperstep()).thenReturn(
BspService.INPUT_SUPERSTEP);
@@ -315,7 +319,7 @@ public class TestPartitionStores {
graphTaskManager = Mockito.mock(GraphTaskManager.class);
Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
ServerData<IntWritable, IntWritable, NullWritable>
- serverData = new ServerData<>(serviceWorker, conf, context);
+ serverData = new ServerData<>(serviceWorker, workerServer, conf, context);
Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
index b56998f..c380c92 100644
--- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
@@ -21,6 +21,7 @@ package org.apache.giraph.utils;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.comm.WorkerServer;
import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
@@ -206,9 +207,10 @@ public class MockUtils {
ByteArrayMessagesPerVertexStore.newFactory(serviceWorker, conf)
.getClass());
+ WorkerServer workerServer = Mockito.mock(WorkerServer.class);
ServerData<IntWritable, IntWritable, IntWritable> serverData =
new ServerData<IntWritable, IntWritable, IntWritable>(
- serviceWorker, conf, context);
+ serviceWorker, workerServer, conf, context);
// Here we add a partition to simulate the case that there is one partition.
serverData.getPartitionStore().addPartition(new SimplePartition());
return serverData;
http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 26dd26a..6148501 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1695,6 +1695,11 @@ under the License.
<version>${dep.commons-lang3.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math</artifactId>
+ <version>2.1</version>
+ </dependency>
+ <dependency>
<groupId>com.facebook.thirdparty.yourkit-api</groupId>
<artifactId>yjp-controller-api-redist</artifactId>
<version>${dep.yourkit-api.version}</version>
|