giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to d6200bd
Date Fri, 28 Jun 2013 17:40:52 GMT
Updated Branches:
  refs/heads/trunk 8db290147 -> d6200bdd8


GIRAPH-702: Fix multithreaded output (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d6200bdd
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d6200bdd
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d6200bdd

Branch: refs/heads/trunk
Commit: d6200bdd8327c94a1b171123ae14157bafacbd6f
Parents: 8db2901
Author: Maja Kabiljo <majakabiljo@maja-mbp.thefacebook.com>
Authored: Fri Jun 28 10:40:16 2013 -0700
Committer: Maja Kabiljo <majakabiljo@maja-mbp.thefacebook.com>
Committed: Fri Jun 28 10:40:16 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  2 ++
 .../apache/giraph/worker/BspServiceWorker.java  | 22 ++++++++++++++---
 .../giraph/hive/output/HiveOutputTest.java      | 26 +++++++++++++++++++-
 3 files changed, 46 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/d6200bdd/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 69d2bee..60cedbc 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-702: Fix multithreaded output (majakabiljo)
+
   GIRAPH-676: A short tutorial on getting started with Giraph (boshmaf via claudio)
 
   GIRAPH-698: Expose Computation to a user (aching)

http://git-wip-us.apache.org/repos/asf/giraph/blob/d6200bdd/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 342e2b2..89b6f9e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -83,6 +83,7 @@ import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import net.iharder.Base64;
 
@@ -95,10 +96,13 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
@@ -920,13 +924,20 @@ public class BspServiceWorker<I extends WritableComparable,
       return;
     }
 
+    final int numPartitions = getPartitionStore().getNumPartitions();
     int numThreads = Math.min(getConfiguration().getNumOutputThreads(),
-        getPartitionStore().getNumPartitions());
+        numPartitions);
     LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
         "saveVertices: Starting to save " + numLocalVertices + " vertices " +
             "using " + numThreads + " threads");
     final VertexOutputFormat<I, V, E> vertexOutputFormat =
         getConfiguration().createWrappedVertexOutputFormat();
+
+    final Queue<Integer> partitionIdQueue =
+        (numPartitions == 0) ? new LinkedList<Integer>() :
+            new ArrayBlockingQueue<Integer>(numPartitions);
+    Iterables.addAll(partitionIdQueue, getPartitionStore().getPartitionIds());
+
     CallableFactory<Void> callableFactory = new CallableFactory<Void>() {
       @Override
       public Callable<Void> newCallable(int callableId) {
@@ -937,14 +948,19 @@ public class BspServiceWorker<I extends WritableComparable,
                 vertexOutputFormat.createVertexWriter(getContext());
             vertexWriter.setConf(getConfiguration());
             vertexWriter.initialize(getContext());
-            long verticesWritten = 0;
             long nextPrintVertices = 0;
             long nextPrintMsecs = System.currentTimeMillis() + 15000;
             int partitionIndex = 0;
             int numPartitions = getPartitionStore().getNumPartitions();
-            for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+            while (!partitionIdQueue.isEmpty()) {
+              Integer partitionId = partitionIdQueue.poll();
+              if (partitionId == null) {
+                break;
+              }
+
               Partition<I, V, E> partition =
                   getPartitionStore().getPartition(partitionId);
+              long verticesWritten = 0;
               for (Vertex<I, V, E> vertex : partition) {
                 vertexWriter.writeVertex(vertex);
                 ++verticesWritten;

http://git-wip-us.apache.org/repos/asf/giraph/blob/d6200bdd/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java b/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java
index 1f92176..4d4d976 100644
--- a/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java
+++ b/giraph-hive/src/test/java/org/apache/giraph/hive/output/HiveOutputTest.java
@@ -18,6 +18,7 @@
 package org.apache.giraph.hive.output;
 
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.hive.GiraphHiveTestBase;
 import org.apache.giraph.hive.Helpers;
@@ -43,6 +44,8 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
+import junit.framework.Assert;
+
 public class HiveOutputTest extends GiraphHiveTestBase {
   private LocalHiveServer hiveServer = new LocalHiveServer("giraph-hive");
 
@@ -88,6 +91,25 @@ public class HiveOutputTest extends GiraphHiveTestBase {
     verifyRecords(inputDesc);
   }
 
+  @Test
+  public void testHiveMultithreadedOutput() throws Exception
+  {
+    String tableName = "test1";
+    hiveServer.createTable("CREATE TABLE " + tableName +
+        " (i1 BIGINT, i2 BIGINT) ");
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setVertexOutputFormatThreadSafe(true);
+    conf.setNumOutputThreads(2);
+    GiraphConstants.USER_PARTITION_COUNT.set(conf, 4);
+    runJob(tableName, conf);
+
+    HiveInputDescription inputDesc = new HiveInputDescription();
+    inputDesc.getTableDesc().setTableName(tableName);
+
+    verifyRecords(inputDesc);
+  }
+
   private void runJob(String tableName, GiraphConfiguration conf) throws Exception {
     String[] edges = new String[] {
         "1 2",
@@ -116,7 +138,9 @@ public class HiveOutputTest extends GiraphHiveTestBase {
 
     // Records are in an unknown sort order so we grab their values here
     for (HiveReadableRecord record : records) {
-      data.put(record.getLong(0), record.getLong(1));
+      if (data.put(record.getLong(0), record.getLong(1)) != null) {
+        Assert.fail("Id " + record.getLong(0) + " appears twice in the output");
+      }
     }
 
     assertEquals(3, data.size());


Mime
View raw message