giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pava...@apache.org
Subject git commit: updated refs/heads/trunk to 969a488
Date Fri, 18 Jul 2014 23:32:20 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk 02d9e6c25 -> 969a48818


GIRAPH-927: Decouple netty server threads from message processing (edunov via pavanka)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/969a4881
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/969a4881
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/969a4881

Branch: refs/heads/trunk
Commit: 969a488183a42dedbde8dcec7ac00595a836974c
Parents: 02d9e6c
Author: Pavan Kumar <pavanka@fb.com>
Authored: Fri Jul 18 16:20:17 2014 -0700
Committer: Pavan Kumar <pavanka@fb.com>
Committed: Fri Jul 18 16:20:17 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../java/org/apache/giraph/comm/ServerData.java |  11 +
 .../messages/InMemoryMessageStoreFactory.java   |  13 +-
 .../queue/AsyncMessageStoreWrapper.java         | 238 +++++++++++++++++++
 .../comm/messages/queue/PartitionMessage.java   |  71 ++++++
 .../comm/messages/queue/package-info.java       |  22 ++
 .../org/apache/giraph/conf/GiraphConstants.java |   8 +
 .../apache/giraph/graph/GraphTaskManager.java   |   5 +-
 .../apache/giraph/worker/BspServiceWorker.java  |   8 +
 .../queue/AsyncMessageStoreWrapperTest.java     | 123 ++++++++++
 .../org/apache/giraph/TestCheckpointing.java    |  13 +-
 11 files changed, 508 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 4207339..d6d998c 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-927: Decouple netty server threads from message processing (edunov via pavanka)
 
+
   GIRAPH-924: Fix checkpointing (edunov via majakabiljo)
 
   GIRAPH-921: Create ByteValueVertex to store vertex values as bytes without object instance
(akyrola via majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 036510e..29488fc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -23,6 +23,7 @@ import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
+import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.EdgeStore;
@@ -202,6 +203,16 @@ public class ServerData<I extends WritableComparable,
   }
 
   /**
+   * In case of async message store we have to wait for all messages
+   * to be processed before going into next superstep.
+   */
+  public void waitForComplete() {
+    if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
+      ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
+    }
+  }
+
+  /**
    * Get the vertex mutations (synchronize on the values)
    *
    * @return Vertex mutations

http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index db22503..02ea7b2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -24,8 +24,9 @@ import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
 import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
-import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessageStore;
+import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
 import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessageStore;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.factories.MessageValueFactory;
 import org.apache.hadoop.io.DoubleWritable;
@@ -155,6 +156,16 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
           (conf.useMessageCombiner() ? " message combiner " +
               conf.getMessageCombinerClass() : " no combiner"));
     }
+
+    int asyncMessageStoreThreads =
+        GiraphConstants.ASYNC_MESSAGE_STORE_THREADS_COUNT.get(conf);
+    if (asyncMessageStoreThreads > 0) {
+      messageStore = new AsyncMessageStoreWrapper(
+          messageStore,
+          service.getPartitionStore().getPartitionIds(),
+          asyncMessageStoreThreads);
+    }
+
     return messageStore;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
new file mode 100644
index 0000000..a62834f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.comm.messages.queue;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import it.unimi.dsi.fastutil.ints.Int2IntArrayMap;
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+/**
+ * This class decouples message receiving and processing
+ * into separate threads thus reducing contention.
+ * It does not provide message store functionality itself, rather
+ * providing a wrapper around existing message stores that
+ * can now be used in async mode with only slight modifications.
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public final class AsyncMessageStoreWrapper<I extends WritableComparable,
+    M extends Writable> implements MessageStore<I, M> {
+
+  /** Logger */
+  private static final Logger LOG =
+      Logger.getLogger(AsyncMessageStoreWrapper.class);
+  /** Pass this id to clear the queues and shutdown all threads
+   * started by this processor */
+  private static final PartitionMessage SHUTDOWN_QUEUE_MESSAGE =
+      new PartitionMessage(-1, null);
+  /** Pass this message to clear the queues but keep threads alive */
+  private static final PartitionMessage CLEAR_QUEUE_MESSAGE =
+      new PartitionMessage(-1, null);
+  /** Executor that processes messages in background */
+  private static final ExecutorService EXECUTOR_SERVICE =
+      Executors.newCachedThreadPool(
+          new ThreadFactoryBuilder()
+              .setNameFormat("AsyncMessageStoreWrapper-%d").build());
+
+  /** Number of threads that will process messages in background */
+  private final int threadsCount;
+  /** Queue that temporary stores messages */
+  private final BlockingQueue<PartitionMessage<I, M>>[] queues;
+  /** Map from partition id to thread that process this partition */
+  private final Int2IntMap partition2Queue;
+  /** Signals that all procesing is done */
+  private Semaphore completionSemaphore;
+  /** Underlying message store */
+  private final MessageStore<I, M> store;
+
+  /**
+   * Constructs async wrapper around existing message store
+   * object. Requires partition list and number of threads
+   * to properly initialize background threads and assign partitions.
+   * Partitions are assigned to threads in round-robin fashion.
+   * It guarantees that all threads have almost the same number of
+   * partitions (+-1) no matter how partitions are assigned to this worker.
+   * @param store underlying message store to be used in computation
+   * @param partitions partitions assigned to this worker
+   * @param threadCount number of threads that will be used to process
+   *                    messages.
+   */
+  public AsyncMessageStoreWrapper(MessageStore<I, M> store,
+                                  Iterable<Integer> partitions,
+                                  int threadCount) {
+    this.store = store;
+    this.threadsCount = threadCount;
+    completionSemaphore = new Semaphore(1 - threadsCount);
+    queues = new BlockingQueue[threadsCount];
+    partition2Queue = new Int2IntArrayMap();
+    LOG.info("AsyncMessageStoreWrapper enabled. Threads= " + threadsCount);
+
+    for (int i = 0; i < threadsCount; i++) {
+      queues[i] = new LinkedBlockingQueue<>();
+      EXECUTOR_SERVICE.submit(new MessageStoreQueueWorker(queues[i]));
+    }
+
+    int cnt = 0;
+    for (int partitionId : partitions) {
+      partition2Queue.put(partitionId, cnt++ % threadsCount);
+    }
+
+  }
+
+  @Override
+  public boolean isPointerListEncoding() {
+    return store.isPointerListEncoding();
+  }
+
+  @Override
+  public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+    return store.getVertexMessages(vertexId);
+  }
+
+  @Override
+  public void clearVertexMessages(I vertexId) throws IOException {
+    store.clearVertexMessages(vertexId);
+  }
+
+  @Override
+  public void clearAll() throws IOException {
+    try {
+      for (BlockingQueue<PartitionMessage<I, M>> queue : queues) {
+        queue.put(SHUTDOWN_QUEUE_MESSAGE);
+      }
+      completionSemaphore.acquire();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+    store.clearAll();
+  }
+
+  @Override
+  public boolean hasMessagesForVertex(I vertexId) {
+    return store.hasMessagesForVertex(vertexId);
+  }
+
+  @Override
+  public void addPartitionMessages(
+      int partitionId, VertexIdMessages<I, M> messages) throws IOException {
+    int hash = partition2Queue.get(partitionId);
+    try {
+      queues[hash].put(new PartitionMessage<>(partitionId, messages));
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void finalizeStore() {
+    store.finalizeStore();
+  }
+
+  @Override
+  public Iterable<I> getPartitionDestinationVertices(int partitionId) {
+    return store.getPartitionDestinationVertices(partitionId);
+  }
+
+  @Override
+  public void clearPartition(int partitionId) throws IOException {
+    store.clearPartition(partitionId);
+  }
+
+  @Override
+  public void writePartition(DataOutput out, int partitionId)
+    throws IOException {
+    store.writePartition(out, partitionId);
+  }
+
+  @Override
+  public void readFieldsForPartition(DataInput in, int partitionId)
+    throws IOException {
+    store.readFieldsForPartition(in, partitionId);
+  }
+
+  /**
+   * Wait till all messages are processed and all queues are empty.
+   */
+  public void waitToComplete() {
+    try {
+      for (BlockingQueue<PartitionMessage<I, M>> queue : queues) {
+        queue.put(CLEAR_QUEUE_MESSAGE);
+      }
+      completionSemaphore.acquire();
+      completionSemaphore = new Semaphore(1 - threadsCount);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * This runnable has logic for background thread
+   * that actually does message processing.
+   */
+  private class MessageStoreQueueWorker implements Runnable {
+    /**
+     * Queue assigned to this background thread.
+     */
+    private final BlockingQueue<PartitionMessage<I, M>> queue;
+
+    /**
+     * Constructs runnable.
+     * @param queue where messages are put by client
+     */
+    private MessageStoreQueueWorker(
+        BlockingQueue<PartitionMessage<I, M>> queue) {
+      this.queue = queue;
+    }
+
+    @Override
+    public void run() {
+      PartitionMessage<I, M> message = null;
+      while (true) {
+        try {
+          message = queue.take();
+          if (message.getMessage() != null) {
+            int partitionId = message.getPartitionId();
+            store.addPartitionMessages(partitionId, message.getMessage());
+          } else {
+            completionSemaphore.release();
+            if (message == SHUTDOWN_QUEUE_MESSAGE) {
+              return;
+            }
+          }
+        } catch (IOException | InterruptedException e) {
+          LOG.error("MessageStoreQueueWorker.run: " + message, e);
+          return;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/PartitionMessage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/PartitionMessage.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/PartitionMessage.java
new file mode 100644
index 0000000..8c884ce
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/PartitionMessage.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.comm.messages.queue;
+
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Small wrapper that holds a reference to vertex message
+ * and knows partition id.
+ * @param <I> vertexId type parameter
+ * @param <M> message type parameter
+ */
+public class PartitionMessage<I extends WritableComparable,
+    M extends Writable> {
+  /** partition id */
+  private int partitionId;
+  /** vertext message */
+  private VertexIdMessages<I, M> message;
+
+  /**
+   * Constructs wrapper from partitino id and vertext message
+   * object.
+   * @param partitionId destination partition id
+   * @param message message object
+   */
+  public PartitionMessage(int partitionId, VertexIdMessages<I, M> message) {
+    this.partitionId = partitionId;
+    this.message = message;
+  }
+
+  /**
+   * Partition id
+   * @return destination partition id.
+   */
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  /**
+   * Message
+   * @return vertex message
+   */
+  public VertexIdMessages<I, M> getMessage() {
+    return message;
+  }
+
+  @Override
+  public String toString() {
+    return "PartitionMessage{" +
+        "partitionId=" + partitionId +
+        ", message=" + message +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/package-info.java
b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/package-info.java
new file mode 100644
index 0000000..e54f9f2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package for message store queue, that decouples netty threads from
+ * threads processing messages.
+ */
+package org.apache.giraph.comm.messages.queue;

http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 3d16e9c..0424a47 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -1143,5 +1143,13 @@ public interface GiraphConstants {
           "org.apache.hadoop.io.compress.DefaultCodec",
           "Defines compression algorithm we will be using for " +
               "storing checkpoint");
+
+  /** Number of threads to use in async message store, 0 means
+   * we should not use async message processing */
+  IntConfOption ASYNC_MESSAGE_STORE_THREADS_COUNT =
+      new IntConfOption("giraph.async.message.store.threads", 0,
+          "Number of threads to be used in async message store.");
+
+
 }
 // CHECKSTYLE: resume InterfaceIsTypeCheck

http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index b2a5c84..684f4eb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -38,8 +38,6 @@ import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.partition.PartitionStore;
-import org.apache.giraph.time.SystemTime;
-import org.apache.giraph.time.Time;
 import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.ProgressableUtils;
@@ -102,8 +100,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends
Writable,
   /** Name of metric for time from first message till last message flushed */
   public static final String TIMER_COMMUNICATION_TIME = "communication-time-ms";
 
-  /** Time instance used for timing in this class */
-  private static final Time TIME = SystemTime.get();
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(GraphTaskManager.class);
   /** Coordination service worker */
@@ -304,6 +300,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends
Writable,
       }
       finishedSuperstepStats = completeSuperstepAndCollectStats(
         partitionStatsList, superstepTimerContext);
+
       // END of superstep compute loop
     }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 0d90a59..d2d24ee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -26,6 +26,8 @@ import org.apache.giraph.comm.WorkerClient;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.comm.WorkerServer;
 import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
 import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
 import org.apache.giraph.comm.netty.NettyWorkerClient;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
@@ -891,6 +893,12 @@ public class BspServiceWorker<I extends WritableComparable,
 
     aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
 
+    MessageStore<I, Writable> incomingMessageStore =
+        getServerData().getIncomingMessageStore();
+    if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
+      ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
+    }
+
     if (LOG.isInfoEnabled()) {
       LOG.info("finishSuperstep: Superstep " + getSuperstep() +
           ", messages = " + workerSentMessages + " " +

http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
new file mode 100644
index 0000000..ca1031a
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.queue;
+
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.factories.TestMessageValueFactory;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Test case for AsyncMessageStoreWrapper
+ */
+public class AsyncMessageStoreWrapperTest {
+
+
+  @Test
+  public void testAsyncQueue() throws IOException {
+    TestMessageStore store = new TestMessageStore();
+
+    AsyncMessageStoreWrapper<LongWritable, IntWritable> queue =
+        new AsyncMessageStoreWrapper<>(store,
+        Arrays.asList(0, 1, 2, 3, 4), 2);
+
+    for (int i = 0; i < 1000; i++) {
+      queue.addPartitionMessages(i % 5, new ByteArrayVertexIdMessages<LongWritable, IntWritable>(new
TestMessageValueFactory<>(IntWritable.class)));
+    }
+
+    queue.waitToComplete();
+
+    assertArrayEquals(new int[] {200, 200, 200, 200, 200}, store.counters);
+
+    queue.clearAll();
+  }
+
+
+  static class TestMessageStore implements MessageStore<LongWritable, IntWritable>
{
+
+    private int counters[] = new int[5];
+
+    @Override
+    public void addPartitionMessages(int partition, VertexIdMessages messages) throws IOException
{
+      assertNotNull(messages);
+      counters[partition]++;
+    }
+
+    @Override
+    public boolean isPointerListEncoding() {
+      return false;
+    }
+
+    @Override
+    public Iterable<IntWritable> getVertexMessages(LongWritable vertexId) throws IOException
{
+      return null;
+    }
+
+    @Override
+    public void clearVertexMessages(LongWritable vertexId) throws IOException {
+
+    }
+
+    @Override
+    public void clearAll() throws IOException {
+
+    }
+
+    @Override
+    public boolean hasMessagesForVertex(LongWritable vertexId) {
+      return false;
+    }
+
+    @Override
+    public void finalizeStore() {
+
+    }
+
+    @Override
+    public Iterable<LongWritable> getPartitionDestinationVertices(int partitionId)
{
+      return null;
+    }
+
+    @Override
+    public void clearPartition(int partitionId) throws IOException {
+
+    }
+
+    @Override
+    public void writePartition(DataOutput out, int partitionId) throws IOException {
+
+    }
+
+    @Override
+    public void readFieldsForPartition(DataInput in, int partitionId) throws IOException
{
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/969a4881/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java b/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
index 387b937..2939af7 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestCheckpointing.java
@@ -71,9 +71,17 @@ public class TestCheckpointing extends BspCase {
     super(TestCheckpointing.class.getName());
   }
 
+  @Test
+  public void testBspCheckpoint() throws InterruptedException, IOException, ClassNotFoundException
{
+    testBspCheckpoint(false);
+  }
 
   @Test
-  public void testBspCheckpoint()
+  public void testAsyncMessageStoreCheckpoint() throws InterruptedException, IOException,
ClassNotFoundException {
+    testBspCheckpoint(true);
+  }
+
+  public void testBspCheckpoint(boolean useAsyncMessageStore)
       throws IOException, InterruptedException, ClassNotFoundException {
     Path checkpointsDir = getTempPath("checkpointing");
     Path outputPath = getTempPath(getCallingMethodName());
@@ -88,6 +96,9 @@ public class TestCheckpointing extends BspCase {
     conf.setVertexOutputFormatClass(SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat.class);
     conf.set("mapred.job.id", TEST_JOB_ID);
     conf.set(KEY_MIN_SUPERSTEP, "0");
+    if (useAsyncMessageStore) {
+      GiraphConstants.ASYNC_MESSAGE_STORE_THREADS_COUNT.set(conf, 2);
+    }
     GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
 
     GiraphConfiguration configuration = job.getConfiguration();


Mime
View raw message