giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hesl...@apache.org
Subject git commit: updated refs/heads/trunk to f5b685e
Date Fri, 23 Dec 2016 18:04:45 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk cdf37f23a -> f5b685efa


GIRAPH-1125

Closes #12


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f5b685ef
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f5b685ef
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f5b685ef

Branch: refs/heads/trunk
Commit: f5b685efa09b539b1f95925405723f7ac7b1dcea
Parents: cdf37f2
Author: Hassan Eslami <heslami@apache.org>
Authored: Fri Dec 23 12:03:37 2016 -0600
Committer: Hassan Eslami <hassan@wirelessprv-10-193-225-240.near.illinois.edu>
Committed: Fri Dec 23 12:03:37 2016 -0600

----------------------------------------------------------------------
 giraph-core/pom.xml                             |   4 +
 .../java/org/apache/giraph/comm/ServerData.java |   4 +-
 .../org/apache/giraph/comm/WorkerServer.java    |   2 +-
 .../flow_control/CreditBasedFlowControl.java    | 122 +++++---------
 .../giraph/comm/flow_control/FlowControl.java   |   5 -
 .../comm/flow_control/NoOpFlowControl.java      |   3 -
 .../comm/flow_control/StaticFlowControl.java    |   3 -
 .../giraph/comm/netty/ByteCounterDelegate.java  |  20 ++-
 .../giraph/comm/netty/InboundByteCounter.java   |  15 ++
 .../apache/giraph/comm/netty/NettyClient.java   |   3 +-
 .../apache/giraph/comm/netty/NettyServer.java   |  10 +-
 .../giraph/comm/netty/NettyWorkerServer.java    |  17 +-
 .../org/apache/giraph/conf/GiraphConstants.java |   4 +-
 .../apache/giraph/edge/AbstractEdgeStore.java   |  11 +-
 .../org/apache/giraph/ooc/OutOfCoreEngine.java  |  64 +++++--
 .../giraph/ooc/data/MetaPartitionManager.java   |  13 +-
 .../ooc/persistence/LocalDiskDataAccessor.java  |  13 +-
 .../ooc/policy/FixedPartitionsOracle.java       |   3 +-
 .../giraph/ooc/policy/OutOfCoreOracle.java      |   5 +-
 .../ooc/policy/SimpleGCMonitoringOracle.java    |   7 +-
 .../giraph/ooc/policy/ThresholdBasedOracle.java | 165 ++++++-------------
 .../apache/giraph/worker/BspServiceWorker.java  |   1 +
 .../giraph/partition/TestPartitionStores.java   |  10 +-
 .../java/org/apache/giraph/utils/MockUtils.java |   4 +-
 pom.xml                                         |   5 +
 25 files changed, 263 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-core/pom.xml b/giraph-core/pom.xml
index 2a943f6..9a5d890 100644
--- a/giraph-core/pom.xml
+++ b/giraph-core/pom.xml
@@ -503,6 +503,10 @@ under the License.
       <artifactId>commons-lang3</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-math</artifactId>
+    </dependency>
+    <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index e926b6c..6f20e39 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -131,11 +131,13 @@ public class ServerData<I extends WritableComparable,
    * Constructor.
    *
    * @param service Service worker
+   * @param workerServer Worker server
    * @param conf Configuration
    * @param context Mapper context
    */
   public ServerData(
       CentralizedServiceWorker<I, V, E> service,
+      WorkerServer workerServer,
       ImmutableClassesGiraphConfiguration<I, V, E> conf,
       Mapper<?, ?, ?, ?>.Context context) {
     this.serviceWorker = service;
@@ -147,7 +149,7 @@ public class ServerData<I extends WritableComparable,
     PartitionStore<I, V, E> inMemoryPartitionStore =
         new SimplePartitionStore<I, V, E>(conf, context);
     if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
-      oocEngine = new OutOfCoreEngine(conf, service);
+      oocEngine = new OutOfCoreEngine(conf, service, workerServer);
       partitionStore =
           new DiskBackedPartitionStore<I, V, E>(inMemoryPartitionStore,
               conf, context, oocEngine);

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
index efd9421..fdb4b08 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/WorkerServer.java
@@ -35,7 +35,7 @@ import java.net.InetSocketAddress;
 @SuppressWarnings("rawtypes")
 public interface WorkerServer<I extends WritableComparable,
     V extends Writable, E extends Writable>
-    extends Closeable {
+    extends NetworkMetrics, Closeable {
   /**
    * Get server address
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
index 0e20eee..18cf017 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
@@ -20,6 +20,19 @@ package org.apache.giraph.comm.flow_control;
 
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.comm.netty.handler.AckSignalFlag;
+import org.apache.giraph.comm.requests.SendResumeRequest;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.utils.AdjustableSemaphore;
+import org.apache.log4j.Logger;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -28,34 +41,11 @@ import java.util.Comparator;
 import java.util.Deque;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.giraph.comm.netty.NettyClient;
-import org.apache.giraph.comm.netty.handler.AckSignalFlag;
-import org.apache.giraph.comm.requests.SendResumeRequest;
-import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.IntConfOption;
-import org.apache.giraph.utils.AdjustableSemaphore;
-import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.utils.LogStacktraceCallable;
-import org.apache.giraph.utils.ThreadUtils;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 /**
  * Representation of credit-based flow control policy. With this policy there
  * can be limited number of open requests from any worker x to any other worker
@@ -182,20 +172,17 @@ public class CreditBasedFlowControl implements FlowControl {
   private final Semaphore unsentRequestPermit;
   /** Netty client used for sending requests */
   private final NettyClient nettyClient;
-  /**
-   * Result of execution for the thread responsible for sending resume signals
-   */
-  private final Future<Void> resumeThreadResult;
-  /** Whether we are shutting down the execution */
-  private volatile boolean shouldTerminate;
 
   /**
    * Constructor
    * @param conf configuration
    * @param nettyClient netty client
+   * @param exceptionHandler Exception handler
    */
   public CreditBasedFlowControl(ImmutableClassesGiraphConfiguration conf,
-                                NettyClient nettyClient) {
+                                NettyClient nettyClient,
+                                Thread.UncaughtExceptionHandler
+                                    exceptionHandler) {
     this.nettyClient = nettyClient;
     maxOpenRequestsPerWorker =
         (short) MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
@@ -205,45 +192,33 @@ public class CreditBasedFlowControl implements FlowControl {
     unsentRequestPermit = new Semaphore(MAX_NUM_OF_UNSENT_REQUESTS.get(conf));
     unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf);
     waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
-    shouldTerminate = false;
-    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+    Thread thread = new Thread(new Runnable() {
       @Override
-      public Callable<Void> newCallable(int callableId) {
-        return new Callable<Void>() {
-          @Override
-          public Void call() throws Exception {
-            while (true) {
-              synchronized (workersToResume) {
-                if (shouldTerminate) {
-                  break;
-                }
-                for (Integer workerId : workersToResume) {
-                  if (maxOpenRequestsPerWorker != 0) {
-                    sendResumeSignal(workerId);
-                  } else {
-                    break;
-                  }
-                }
-                try {
-                  workersToResume.wait();
-                } catch (InterruptedException e) {
-                  throw new IllegalStateException("call: caught exception " +
-                      "while waiting for resume-sender thread to be notified!",
-                      e);
-                }
+      public void run() {
+        while (true) {
+          synchronized (workersToResume) {
+            for (Integer workerId : workersToResume) {
+              if (maxOpenRequestsPerWorker != 0) {
+                sendResumeSignal(workerId);
+              } else {
+                break;
               }
             }
-            return null;
+            try {
+              workersToResume.wait();
+            } catch (InterruptedException e) {
+              throw new IllegalStateException("run: caught exception " +
+                  "while waiting for resume-sender thread to be notified!",
+                  e);
+            }
           }
-        };
+        }
       }
-    };
-
-    ExecutorService executor = Executors.newSingleThreadExecutor(
-        ThreadUtils.createThreadFactory("resume-sender"));
-    resumeThreadResult = executor.submit(new LogStacktraceCallable<>(
-        callableFactory.newCallable(0)));
-    executor.shutdown();
+    });
+    thread.setUncaughtExceptionHandler(exceptionHandler);
+    thread.setName("resume-sender");
+    thread.setDaemon(true);
+    thread.start();
   }
 
   /**
@@ -252,6 +227,11 @@ public class CreditBasedFlowControl implements FlowControl {
    * @param workerId id of the worker to send the resume signal to
    */
   private void sendResumeSignal(int workerId) {
+    if (maxOpenRequestsPerWorker == 0) {
+      LOG.warn("sendResumeSignal: method called while the max open requests " +
+          "for worker " + workerId + " is still 0");
+      return;
+    }
     WritableRequest request = new SendResumeRequest(maxOpenRequestsPerWorker);
     Long resumeId = nettyClient.doSend(workerId, request);
     checkState(resumeId != null);
@@ -407,20 +387,6 @@ public class CreditBasedFlowControl implements FlowControl {
   }
 
   @Override
-  public void shutdown() {
-    synchronized (workersToResume) {
-      shouldTerminate = true;
-      workersToResume.notifyAll();
-    }
-    try {
-      resumeThreadResult.get();
-    } catch (InterruptedException | ExecutionException e) {
-      throw new IllegalStateException("shutdown: caught exception while" +
-          "getting result of resume-sender thread");
-    }
-  }
-
-  @Override
   public void logInfo() {
     if (LOG.isInfoEnabled()) {
       // Count how many unsent requests each task has

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
index 4072af7..b723a01 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/FlowControl.java
@@ -83,11 +83,6 @@ public interface FlowControl {
   int calculateResponse(AckSignalFlag flag, int taskId);
 
   /**
-   * Shutdown the flow control policy
-   */
-  void shutdown();
-
-  /**
    * Log the status of the flow control
    */
   void logInfo();

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
index c97c967..fdd6035 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/NoOpFlowControl.java
@@ -66,8 +66,5 @@ public class NoOpFlowControl implements FlowControl {
   }
 
   @Override
-  public void shutdown() { }
-
-  @Override
   public void logInfo() { }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
index 6d67afd..332639c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/flow_control/StaticFlowControl.java
@@ -155,9 +155,6 @@ public class StaticFlowControl implements
   }
 
   @Override
-  public void shutdown() { }
-
-  @Override
   public void logInfo() {
     if (LOG.isInfoEnabled()) {
       LOG.info("logInfo: " + numWaitingThreads.get() + " threads waiting " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounterDelegate.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounterDelegate.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounterDelegate.java
index 710ec9d..32f414d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounterDelegate.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/ByteCounterDelegate.java
@@ -44,8 +44,10 @@ public class ByteCounterDelegate implements ByteCounter {
       new DecimalFormat("#######.####");
   /** Class timer */
   private static final Time TIME = SystemTime.get();
-  /** All bytes ever processed */
+  /** Bytes processed during the most recent time interval */
   private final AtomicLong bytesProcessed = new AtomicLong();
+  /** Aggregate bytes per superstep */
+  private final AtomicLong bytesProcessedPerSuperstep = new AtomicLong();
   /** Total processed requests */
   private final AtomicLong processedRequests = new AtomicLong();
   /** Start time (for bandwidth calculation) */
@@ -99,6 +101,7 @@ public class ByteCounterDelegate implements ByteCounter {
   public int byteBookkeeper(ByteBuf buf) {
     int processedBytes = buf.readableBytes();
     bytesProcessed.addAndGet(processedBytes);
+    bytesProcessedPerSuperstep.addAndGet(processedBytes);
     processedBytesHist.update(processedBytes);
     processedRequests.incrementAndGet();
     processedRequestsMeter.mark();
@@ -126,6 +129,21 @@ public class ByteCounterDelegate implements ByteCounter {
     resetStartMsecs();
   }
 
+  /**
+   * Returns bytes processed per superstep.
+   * @return Number of bytes.
+   */
+  public long getBytesProcessedPerSuperstep() {
+    return bytesProcessedPerSuperstep.get();
+  }
+
+  /**
+   * Set bytes processed per superstep to 0.
+   */
+  public void resetBytesProcessedPerSuperstep() {
+    bytesProcessedPerSuperstep.set(0);
+  }
+
   public long getBytesProcessed() {
     return bytesProcessed.get();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java
index 44b9c5d..7f616fb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/InboundByteCounter.java
@@ -53,6 +53,21 @@ public class InboundByteCounter extends ChannelInboundHandlerAdapter implements
   }
 
   /**
+   * Returns bytes received per superstep.
+   * @return Number of bytes.
+   */
+  public long getBytesReceivedPerSuperstep() {
+    return delegate.getBytesProcessedPerSuperstep();
+  }
+
+  /**
+   * Set bytes received per superstep to 0.
+   */
+  public void resetBytesReceivedPerSuperstep() {
+    delegate.resetBytesProcessedPerSuperstep();
+  }
+
+  /**
    * @return Mbytes received / sec in the current interval
    */
   public double getMbytesPerSecReceived() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 541ce93..7b751ed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -237,7 +237,7 @@ public class NettyClient {
     if (limitNumberOfOpenRequests) {
       flowControl = new StaticFlowControl(conf, this);
     } else if (limitOpenRequestsPerWorker) {
-      flowControl = new CreditBasedFlowControl(conf, this);
+      flowControl = new CreditBasedFlowControl(conf, this, exceptionHandler);
     } else {
       flowControl = new NoOpFlowControl(this);
     }
@@ -644,7 +644,6 @@ public class NettyClient {
     if (LOG.isInfoEnabled()) {
       LOG.info("stop: Halting netty client");
     }
-    flowControl.shutdown();
     // Close connections asynchronously, in a Netty-approved
     // way, without cleaning up thread pools until all channels
     // in addressChannelMap are closed (success or failure)

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
index a461bdd..dabd175 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
@@ -71,7 +71,6 @@ public class NettyServer {
   /** Default maximum thread pool size */
   public static final int MAXIMUM_THREAD_POOL_SIZE_DEFAULT = 32;
 
-
 /*if_not[HADOOP_NON_SECURE]*/
   /** Used to authenticate with netty clients */
   public static final AttributeKey<SaslNettyServer>
@@ -128,6 +127,7 @@ public class NettyServer {
   /** Handles all uncaught exceptions in netty threads */
   private final Thread.UncaughtExceptionHandler exceptionHandler;
 
+
   /**
    * Constructor for creating the server
    *
@@ -217,6 +217,14 @@ public class NettyServer {
 /*end[HADOOP_NON_SECURE]*/
 
   /**
+   * Returns a handle on the in-bound byte counter.
+   * @return The {@link InboundByteCounter} object for this server.
+   */
+  public InboundByteCounter getInByteCounter() {
+    return inByteCounter;
+  }
+
+  /**
    * Start the server with the appropriate port
    */
   public void start() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index befce5f..57b6873 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -74,7 +74,7 @@ public class NettyWorkerServer<I extends WritableComparable,
     this.context = context;
 
     serverData =
-        new ServerData<I, V, E>(service, conf, context);
+        new ServerData<I, V, E>(service, this, conf, context);
 
     nettyServer = new NettyServer(conf,
         new WorkerRequestServerHandler.Factory<I, V, E>(serverData),
@@ -111,4 +111,19 @@ public class NettyWorkerServer<I extends WritableComparable,
   public void setFlowControl(FlowControl flowControl) {
     nettyServer.setFlowControl(flowControl);
   }
+
+  @Override
+  public long getBytesReceivedPerSuperstep() {
+    return nettyServer.getInByteCounter().getBytesReceivedPerSuperstep();
+  }
+
+  @Override
+  public void resetBytesReceivedPerSuperstep() {
+    nettyServer.getInByteCounter().resetBytesReceivedPerSuperstep();
+  }
+
+  @Override
+  public long getBytesReceived() {
+    return nettyServer.getInByteCounter().getBytesReceived();
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 14b8ddd..c7b04d8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -75,8 +75,8 @@ import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.ooc.persistence.OutOfCoreDataAccessor;
 import org.apache.giraph.ooc.persistence.LocalDiskDataAccessor;
+import org.apache.giraph.ooc.policy.MemoryEstimatorOracle;
 import org.apache.giraph.ooc.policy.OutOfCoreOracle;
-import org.apache.giraph.ooc.policy.ThresholdBasedOracle;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.HashPartitionerFactory;
 import org.apache.giraph.partition.Partition;
@@ -986,7 +986,7 @@ public interface GiraphConstants {
    */
   ClassConfOption<OutOfCoreOracle> OUT_OF_CORE_ORACLE =
       ClassConfOption.create("giraph.outOfCoreOracle",
-          ThresholdBasedOracle.class, OutOfCoreOracle.class,
+          MemoryEstimatorOracle.class, OutOfCoreOracle.class,
           "Out-of-core oracle that is to be used for adaptive out-of-core " +
               "engine");
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
index d2e7e8d..3b211fb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/AbstractEdgeStore.java
@@ -27,7 +27,9 @@ import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.ooc.OutOfCoreEngine;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.utils.CallableFactory;
+import org.apache.giraph.utils.ProgressCounter;
 import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.ThreadLocalProgressCounter;
 import org.apache.giraph.utils.Trimmable;
 import org.apache.giraph.utils.VertexIdEdgeIterator;
 import org.apache.giraph.utils.VertexIdEdges;
@@ -60,6 +62,9 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
   V extends Writable, E extends Writable, K, Et>
   extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
   implements EdgeStore<I, V, E> {
+  /** Used to keep track of progress during the move-edges process */
+  public static final ThreadLocalProgressCounter PROGRESS_COUNTER =
+    new ThreadLocalProgressCounter();
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(AbstractEdgeStore.class);
   /** Service worker. */
@@ -81,10 +86,11 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
    */
   protected boolean useInputOutEdges;
   /** Whether we spilled edges on disk */
-  private boolean hasEdgesOnDisk = false;
+  private volatile boolean hasEdgesOnDisk = false;
   /** Create source vertices */
   private CreateSourceVertexCallback<I> createSourceVertexCallback;
 
+
   /**
    * Constructor.
    *
@@ -274,12 +280,12 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
         return new Callable<Void>() {
           @Override
           public Void call() throws Exception {
-            Integer partitionId;
             I representativeVertexId = configuration.createVertexId();
             OutOfCoreEngine oocEngine = service.getServerData().getOocEngine();
             if (oocEngine != null) {
               oocEngine.processingThreadStart();
             }
+            ProgressCounter numVerticesProcessed = PROGRESS_COUNTER.get();
             while (true) {
               Partition<I, V, E> partition =
                   service.getPartitionStore().getNextPartition();
@@ -338,6 +344,7 @@ public abstract class AbstractEdgeStore<I extends WritableComparable,
                   // require us to put back the vertex after modifying it.
                   partition.saveVertex(vertex);
                 }
+                numVerticesProcessed.inc();
                 iterator.remove();
               }
               // Some PartitionStore implementations

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
index 65399b2..82a55f1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
@@ -22,7 +22,9 @@ import com.sun.management.GarbageCollectionNotificationInfo;
 import com.yammer.metrics.core.Gauge;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.NetworkMetrics;
 import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
 import org.apache.giraph.comm.flow_control.FlowControl;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -101,6 +103,11 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
   /** Semaphore used for controlling number of active threads at each moment */
   private final AdjustableSemaphore activeThreadsPermit;
   /**
+   * Cached value for NettyClient.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER (max
+   * credit used for credit-based flow-control mechanism)
+   */
+  private final short maxRequestsCredit;
+  /**
    * Generally, the logic in Giraph for change of the superstep happens in the
    * following order:
    *   (1) Compute threads are done processing all partitions
@@ -141,14 +148,23 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
   private boolean resetDone;
 
   /**
+   * Provides statistics about network traffic (e.g. received bytes per
+   * superstep etc).
+   */
+  private final NetworkMetrics networkMetrics;
+
+  /**
    * Constructor
    *
    * @param conf Configuration
    * @param service Service worker
+   * @param networkMetrics Interface for network stats
    */
   public OutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
-                         CentralizedServiceWorker<?, ?, ?> service) {
+                         CentralizedServiceWorker<?, ?, ?> service,
+                         NetworkMetrics networkMetrics) {
     this.service = service;
+    this.networkMetrics = networkMetrics;
     Class<? extends OutOfCoreDataAccessor> accessorClass =
         GiraphConstants.OUT_OF_CORE_DATA_ACCESSOR.get(conf);
     try {
@@ -178,22 +194,24 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
           "out-of-core policy. Setting the oracle to be FixedPartitionsOracle");
       oracleClass = FixedPartitionsOracle.class;
     }
-    try {
-      Constructor<?> constructor = oracleClass.getConstructor(
-          ImmutableClassesGiraphConfiguration.class, OutOfCoreEngine.class);
-      this.oracle = (OutOfCoreOracle) constructor.newInstance(conf, this);
-    } catch (NoSuchMethodException | IllegalAccessException |
-        InstantiationException | InvocationTargetException e) {
-      throw new IllegalStateException("OutOfCoreEngine: caught exception " +
-          "while creating the oracle!", e);
-    }
     this.numComputeThreads = conf.getNumComputeThreads();
     // At the beginning of the execution, only input threads are processing data
     this.numProcessingThreads = conf.getNumInputSplitsThreads();
     this.activeThreadsPermit = new AdjustableSemaphore(numProcessingThreads);
+    this.maxRequestsCredit = (short)
+        CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
     this.superstep = BspService.INPUT_SUPERSTEP;
     this.resetDone = false;
     GiraphMetrics.get().addSuperstepResetObserver(this);
+    try {
+      Constructor<?> constructor = oracleClass.getConstructor(
+        ImmutableClassesGiraphConfiguration.class, OutOfCoreEngine.class);
+      this.oracle = (OutOfCoreOracle) constructor.newInstance(conf, this);
+    } catch (NoSuchMethodException | IllegalAccessException |
+      InstantiationException | InvocationTargetException e) {
+      throw new IllegalStateException("OutOfCoreEngine: caught exception " +
+        "while creating the oracle!", e);
+    }
   }
 
   /**
@@ -332,6 +350,7 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
       "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
   public void startIteration() {
+    oracle.startIteration();
     if (!resetDone) {
       superstepLock.writeLock().lock();
       metaPartitionManager.resetPartitions();
@@ -460,6 +479,27 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
   }
 
   /**
+   * Update the credit announced for this worker in Netty. The lower the credit
+   * is, the lower rate incoming messages arrive at this worker. Thus, credit
+   * is an indirect way of controlling amount of memory incoming messages would
+   * take.
+   *
+   * @param fraction the fraction of max credits others can use to send requests
+   *                 to this worker
+   */
+  public void updateRequestsCreditFraction(double fraction) {
+    checkState(fraction >= 0 && fraction <= 1);
+    short newCredit = (short) (maxRequestsCredit * fraction);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("updateRequestsCreditFraction: updating the credit to " +
+          newCredit);
+    }
+    if (flowControl != null) {
+      ((CreditBasedFlowControl) flowControl).updateCredit(newCredit);
+    }
+  }
+
+  /**
    * Reset partitions and messages meta data. Also, reset the cached value of
    * superstep counter.
    */
@@ -507,4 +547,8 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
   public OutOfCoreDataAccessor getDataAccessor() {
     return dataAccessor;
   }
+
+  public NetworkMetrics getNetworkMetrics() {
+    return networkMetrics;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
index 173b451..32a5a6e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/MetaPartitionManager.java
@@ -526,8 +526,8 @@ public class MetaPartitionManager {
     meta = perThreadPartitionDictionary.get(threadId).lookup(
         ProcessingState.PROCESSED,
         StorageState.IN_MEM,
-        StorageState.ON_DISK,
-        null);
+        null,
+        StorageState.ON_DISK);
     if (meta != null) {
       return meta.getPartitionId();
     }
@@ -535,18 +535,17 @@ public class MetaPartitionManager {
     meta = perThreadPartitionDictionary.get(threadId).lookup(
         ProcessingState.PROCESSED,
         StorageState.ON_DISK,
-        StorageState.IN_MEM,
-        null);
+        null,
+        StorageState.IN_MEM);
     if (meta != null) {
       return meta.getPartitionId();
     }
 
-    // Forth, look for a processed partition entirely on disk
     meta = perThreadPartitionDictionary.get(threadId).lookup(
         ProcessingState.PROCESSED,
         StorageState.ON_DISK,
-        StorageState.ON_DISK,
-        null);
+        null,
+        StorageState.ON_DISK);
     if (meta != null) {
       return meta.getPartitionId();
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
index 8efa9de..f189def 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/persistence/LocalDiskDataAccessor.java
@@ -94,12 +94,11 @@ public class LocalDiskDataAccessor implements OutOfCoreDataAccessor {
     int ptr = 0;
     String jobId = conf.getJobId();
     for (String path : userPaths) {
-      File file = new File(path);
-      if (!file.exists()) {
-        checkState(file.mkdirs(), "LocalDiskDataAccessor: cannot create " +
-            "directory " + file.getAbsolutePath());
-      }
-      basePaths[ptr] = path + "/" + jobId;
+      String jobDirectory = path + "/" + jobId;
+      File file = new File(jobDirectory);
+      checkState(file.mkdirs(), "LocalDiskDataAccessor: cannot create " +
+          "directory " + file.getAbsolutePath());
+      basePaths[ptr] = jobDirectory + "/";
       ptr++;
     }
     final int diskBufferSize = OOC_DISK_BUFFER_SIZE.get(conf);
@@ -112,7 +111,7 @@ public class LocalDiskDataAccessor implements OutOfCoreDataAccessor {
   @Override
   public void shutdown() {
     for (String path : basePaths) {
-      File file = new File(path).getParentFile();
+      File file = new File(path);
       for (String subFileName : file.list()) {
         File subFile = new File(file.getPath(), subFileName);
         checkState(subFile.delete(), "shutdown: cannot delete file %s",

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
index da21973..dbce6a6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/FixedPartitionsOracle.java
@@ -145,5 +145,6 @@ public class FixedPartitionsOracle implements OutOfCoreOracle {
   public void gcCompleted(GarbageCollectionNotificationInfo gcInfo) { }
 
   @Override
-  public void shutdown() { }
+  public void startIteration() {
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java
index 45b9914..ae54a91 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/OutOfCoreOracle.java
@@ -128,8 +128,7 @@ public interface OutOfCoreOracle {
   void gcCompleted(GarbageCollectionNotificationInfo gcInfo);
 
   /**
-   * Shut down the out-of-core oracle. Necessary specifically for cases where
-   * out-of-core oracle is using additional monitoring threads.
+   * Called at the beginning of a superstep.
    */
-  void shutdown();
+  void startIteration();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java
index 477b3ec..2bea697 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/SimpleGCMonitoringOracle.java
@@ -141,6 +141,10 @@ public class SimpleGCMonitoringOracle implements OutOfCoreOracle {
     lastGCObservation = observation;
   }
 
+  @Override
+  public void startIteration() {
+  }
+
   /**
    * Get the current data injection rate to memory based on the commands ran
    * in the history (retrieved from statistics collector), and outstanding
@@ -274,9 +278,6 @@ public class SimpleGCMonitoringOracle implements OutOfCoreOracle {
     commandOccurrences.get(command.getType()).getAndDecrement();
   }
 
-  @Override
-  public void shutdown() { }
-
   /** Helper class to record memory status after GC calls */
   private class GCObservation {
     /** The time at which the GC happened (in milliseconds) */

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
index 00a8011..2dd2c10 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/policy/ThresholdBasedOracle.java
@@ -19,28 +19,15 @@
 package org.apache.giraph.ooc.policy;
 
 import com.sun.management.GarbageCollectionNotificationInfo;
-import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
-import org.apache.giraph.comm.flow_control.FlowControl;
 import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.giraph.conf.FloatConfOption;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.conf.LongConfOption;
 import org.apache.giraph.ooc.OutOfCoreEngine;
 import org.apache.giraph.ooc.command.IOCommand;
-import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.utils.LogStacktraceCallable;
 import org.apache.giraph.utils.MemoryUtils;
-import org.apache.giraph.utils.ThreadUtils;
 import org.apache.log4j.Logger;
 
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
 import static com.google.common.base.Preconditions.checkState;
 
 /**
@@ -89,7 +76,7 @@ import static com.google.common.base.Preconditions.checkState;
 public class ThresholdBasedOracle implements OutOfCoreOracle {
   /** The memory pressure at/above which the job would fail */
   public static final FloatConfOption FAIL_MEMORY_PRESSURE =
-      new FloatConfOption("giraph.memory.failPressure", 0.975f,
+      new FloatConfOption("giraph.threshold.failPressure", 0.975f,
           "The memory pressure (fraction of used memory) at/above which the " +
               "job would fail.");
   /**
@@ -98,13 +85,13 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
    * job processing rate.
    */
   public static final FloatConfOption EMERGENCY_MEMORY_PRESSURE =
-      new FloatConfOption("giraph.memory.emergencyPressure", 0.925f,
+      new FloatConfOption("giraph.threshold.emergencyPressure", 0.925f,
           "The memory pressure (fraction of used memory) at which the job " +
               "is close to fail, hence we should reduce its processing rate " +
               "as much as possible.");
   /** The memory pressure at which the job is suffering from GC overhead. */
   public static final FloatConfOption HIGH_MEMORY_PRESSURE =
-      new FloatConfOption("giraph.memory.highPressure", 0.875f,
+      new FloatConfOption("giraph.threshold.highPressure", 0.875f,
           "The memory pressure (fraction of used memory) at which the job " +
               "is suffering from GC overhead.");
   /**
@@ -112,7 +99,7 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
    * memory intensive job.
    */
   public static final FloatConfOption OPTIMAL_MEMORY_PRESSURE =
-      new FloatConfOption("giraph.memory.optimalPressure", 0.8f,
+      new FloatConfOption("giraph.threshold.optimalPressure", 0.8f,
           "The memory pressure (fraction of used memory) at which a " +
               "memory-intensive job shows the optimal GC behavior.");
   /**
@@ -120,12 +107,12 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
    * suffering from GC overhead.
    */
   public static final FloatConfOption LOW_MEMORY_PRESSURE =
-      new FloatConfOption("giraph.memory.lowPressure", 0.7f,
+      new FloatConfOption("giraph.threshold.lowPressure", 0.7f,
           "The memory pressure (fraction of used memory) at/below which the " +
               "job can use more memory without suffering the performance.");
   /** The interval at which memory observer thread wakes up. */
   public static final LongConfOption CHECK_MEMORY_INTERVAL =
-      new LongConfOption("giraph.checkMemoryInterval", 2500,
+      new LongConfOption("giraph.threshold.checkMemoryInterval", 2500,
           "The interval/period where memory observer thread wakes up and " +
               "monitors memory footprint (in milliseconds)");
   /**
@@ -134,7 +121,7 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
    * past is specified in this parameter
    */
   public static final LongConfOption LAST_GC_CALL_INTERVAL =
-      new LongConfOption("giraph.lastGcCallInterval", 10 * 1000,
+      new LongConfOption("giraph.threshold.lastGcCallInterval", 10 * 1000,
           "How long after last major/full GC should we call manual GC?");
 
   /** Class logger */
@@ -154,18 +141,6 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
   private final long checkMemoryInterval;
   /** Cached value for LAST_GC_CALL_INTERVAL */
   private final long lastGCCallInterval;
-  /**
-   * Cached value for NettyClient.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER (max
-   * credit used for credit-based flow-control mechanism)
-   */
-  private final short maxRequestsCredit;
-  /**
-   * Whether the job is shutting down. Used for terminating the memory
-   * observer thread.
-   */
-  private final CountDownLatch shouldTerminate;
-  /** Result of memory observer thread */
-  private final Future<Void> checkMemoryThreadResult;
   /** Out-of-core engine */
   private final OutOfCoreEngine oocEngine;
   /** Last time a major/full GC has been called (in milliseconds) */
@@ -188,70 +163,59 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
     this.lowMemoryPressure = LOW_MEMORY_PRESSURE.get(conf);
     this.checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);
     this.lastGCCallInterval = LAST_GC_CALL_INTERVAL.get(conf);
-    this.maxRequestsCredit = (short)
-        CreditBasedFlowControl.MAX_NUM_OF_OPEN_REQUESTS_PER_WORKER.get(conf);
     NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.setIfUnset(conf, true);
     boolean useCredit = NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
     checkState(useCredit, "ThresholdBasedOracle: credit-based flow control " +
         "must be enabled. Use giraph.waitForPerWorkerRequests=true");
-    this.shouldTerminate = new CountDownLatch(1);
     this.oocEngine = oocEngine;
     this.lastMajorGCTime = 0;
 
-    CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
+    final Thread thread = new Thread(new Runnable() {
       @Override
-      public Callable<Void> newCallable(int callableId) {
-        return new Callable<Void>() {
-          @Override
-          public Void call() throws Exception {
-            while (true) {
-              boolean done = shouldTerminate.await(checkMemoryInterval,
-                  TimeUnit.MILLISECONDS);
-              if (done) {
-                break;
-              }
-              double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
-              long time = System.currentTimeMillis();
-              if ((usedMemoryFraction > highMemoryPressure &&
-                  time - lastMajorGCTime >= lastGCCallInterval) ||
-                  (usedMemoryFraction > optimalMemoryPressure &&
+      public void run() {
+        while (true) {
+          double usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
+          long time = System.currentTimeMillis();
+          if ((usedMemoryFraction > highMemoryPressure &&
+              time - lastMajorGCTime >= lastGCCallInterval) ||
+              (usedMemoryFraction > optimalMemoryPressure &&
                   time - lastMajorGCTime >= lastGCCallInterval &&
                   time - lastMinorGCTime >= lastGCCallInterval)) {
-                if (LOG.isInfoEnabled()) {
-                  LOG.info("call: last GC happened a while ago and the " +
-                      "amount of used memory is high (used memory " +
-                      "fraction is " +
-                      String.format("%.2f", usedMemoryFraction) + "). " +
-                      "Calling GC manually");
-                }
-                System.gc();
-                time = System.currentTimeMillis() - time;
-                usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
-                if (LOG.isInfoEnabled()) {
-                  LOG.info("call: manual GC is done. It took " +
-                      String.format("%.2f", (double) time / 1000) +
-                      " seconds. Used memory fraction is " +
-                      String.format("%.2f", usedMemoryFraction));
-                }
-              }
-              updateRates(usedMemoryFraction);
+            if (LOG.isInfoEnabled()) {
+              LOG.info("call: last GC happened a while ago and the " +
+                  "amount of used memory is high (used memory " +
+                  "fraction is " +
+                  String.format("%.2f", usedMemoryFraction) + "). " +
+                  "Calling GC manually");
             }
-            return null;
+            System.gc();
+            time = System.currentTimeMillis() - time;
+            usedMemoryFraction = 1 - MemoryUtils.freeMemoryFraction();
+            if (LOG.isInfoEnabled()) {
+              LOG.info("call: manual GC is done. It took " +
+                  String.format("%.2f", (double) time / 1000) +
+                  " seconds. Used memory fraction is " +
+                  String.format("%.2f", usedMemoryFraction));
+            }
+          }
+          updateRates(usedMemoryFraction);
+          try {
+            Thread.sleep(checkMemoryInterval);
+          } catch (InterruptedException e) {
+            LOG.warn("run: exception occurred!", e);
+            return;
           }
-        };
+        }
       }
-    };
-    ExecutorService executor = Executors.newSingleThreadExecutor(
-        ThreadUtils.createThreadFactory("check-memory"));
-    this.checkMemoryThreadResult = executor.submit(new LogStacktraceCallable<>(
-        callableFactory.newCallable(0)));
-    executor.shutdown();
+    });
+    thread.setUncaughtExceptionHandler(oocEngine.getServiceWorker()
+        .getGraphTaskManager().createUncaughtExceptionHandler());
+    thread.setName("memory-checker");
+    thread.setDaemon(true);
+    thread.start();
   }
 
   /**
-   * upon major/full GC calls.
-   */
-  /**
    * Update statistics and rate regarding communication credits and number of
    * active threads.
    *
@@ -272,13 +236,13 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
     // Update the fraction of credit that should be used in credit-based flow-
     // control
     if (usedMemoryFraction >= emergencyMemoryPressure) {
-      updateRequestsCredit((short) 0);
+      oocEngine.updateRequestsCreditFraction(0);
     } else if (usedMemoryFraction < optimalMemoryPressure) {
-      updateRequestsCredit(maxRequestsCredit);
+      oocEngine.updateRequestsCreditFraction(1);
     } else {
-      updateRequestsCredit((short) (maxRequestsCredit *
-          (1 - (usedMemoryFraction - optimalMemoryPressure) /
-              (emergencyMemoryPressure - optimalMemoryPressure))));
+      oocEngine.updateRequestsCreditFraction(1 -
+          (usedMemoryFraction - optimalMemoryPressure) /
+              (emergencyMemoryPressure - optimalMemoryPressure));
     }
   }
 
@@ -331,35 +295,6 @@ public class ThresholdBasedOracle implements OutOfCoreOracle {
   }
 
   @Override
-  public void shutdown() {
-    shouldTerminate.countDown();
-    try {
-      checkMemoryThreadResult.get();
-    } catch (InterruptedException | ExecutionException e) {
-      LOG.error("shutdown: caught exception while waiting on check-memory " +
-          "thread to terminate!");
-      throw new IllegalStateException(e);
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("shutdown: ThresholdBasedOracle shutdown complete!");
-    }
-  }
-
-  /**
-   * Update the credit announced for this worker in Netty. The lower the credit
-   * is, the lower rate incoming messages arrive at this worker. Thus, credit
-   * is an indirect way of controlling amount of memory incoming messages would
-   * take.
-   *
-   * @param newCredit the new credit to announce to other workers
-   */
-  private void updateRequestsCredit(short newCredit) {
-    if (LOG.isInfoEnabled()) {
-      LOG.info("updateRequestsCredit: updating the credit to " + newCredit);
-    }
-    FlowControl flowControl = oocEngine.getFlowControl();
-    if (flowControl != null) {
-      ((CreditBasedFlowControl) flowControl).updateCredit(newCredit);
-    }
+  public void startIteration() {
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index c51521d..5b00eb7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -714,6 +714,7 @@ else[HADOOP_NON_SECURE]*/
     workerInfoList.clear();
     workerInfoList = addressesAndPartitions.getWorkerInfos();
     masterInfo = addressesAndPartitions.getMasterInfo();
+    workerServer.resetBytesReceivedPerSuperstep();
 
     if (LOG.isInfoEnabled()) {
       LOG.info("startSuperstep: " + masterInfo);

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index 1e4593b..8f16c35 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.WorkerServer;
 import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
@@ -162,13 +163,14 @@ public class TestPartitionStores {
 
     CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
       serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
+    WorkerServer workerServer = Mockito.mock(WorkerServer.class);
     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
         BspService.INPUT_SUPERSTEP);
     GraphTaskManager<IntWritable, IntWritable, NullWritable>
         graphTaskManager = Mockito.mock(GraphTaskManager.class);
     Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
     ServerData<IntWritable, IntWritable, NullWritable>
-        serverData = new ServerData<>(serviceWorker, conf, context);
+        serverData = new ServerData<>(serviceWorker, workerServer, conf, context);
     Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
 
     DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
@@ -191,13 +193,14 @@ public class TestPartitionStores {
 
     CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
     serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
+    WorkerServer workerServer = Mockito.mock(WorkerServer.class);
     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
         BspService.INPUT_SUPERSTEP);
     GraphTaskManager<IntWritable, IntWritable, NullWritable>
         graphTaskManager = Mockito.mock(GraphTaskManager.class);
     Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
     ServerData<IntWritable, IntWritable, NullWritable>
-        serverData = new ServerData<>(serviceWorker, conf, context);
+        serverData = new ServerData<>(serviceWorker, workerServer, conf, context);
     Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
 
     DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
@@ -308,6 +311,7 @@ public class TestPartitionStores {
 
     CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
     serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
+    WorkerServer workerServer = Mockito.mock(WorkerServer.class);
 
     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
         BspService.INPUT_SUPERSTEP);
@@ -315,7 +319,7 @@ public class TestPartitionStores {
         graphTaskManager = Mockito.mock(GraphTaskManager.class);
     Mockito.when(serviceWorker.getGraphTaskManager()).thenReturn(graphTaskManager);
     ServerData<IntWritable, IntWritable, NullWritable>
-        serverData = new ServerData<>(serviceWorker, conf, context);
+        serverData = new ServerData<>(serviceWorker, workerServer, conf, context);
     Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
 
     DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
index b56998f..c380c92 100644
--- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
@@ -21,6 +21,7 @@ package org.apache.giraph.utils;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.comm.WorkerServer;
 import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
@@ -206,9 +207,10 @@ public class MockUtils {
         ByteArrayMessagesPerVertexStore.newFactory(serviceWorker, conf)
             .getClass());
 
+    WorkerServer workerServer = Mockito.mock(WorkerServer.class);
     ServerData<IntWritable, IntWritable, IntWritable> serverData =
       new ServerData<IntWritable, IntWritable, IntWritable>(
-          serviceWorker, conf, context);
+          serviceWorker, workerServer, conf, context);
     // Here we add a partition to simulate the case that there is one partition.
     serverData.getPartitionStore().addPartition(new SimplePartition());
     return serverData;

http://git-wip-us.apache.org/repos/asf/giraph/blob/f5b685ef/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 26dd26a..6148501 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1695,6 +1695,11 @@ under the License.
         <version>${dep.commons-lang3.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-math</artifactId>
+        <version>2.1</version>
+      </dependency>
+      <dependency>
         <groupId>com.facebook.thirdparty.yourkit-api</groupId>
         <artifactId>yjp-controller-api-redist</artifactId>
         <version>${dep.yourkit-api.version}</version>


Mime
View raw message