giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clau...@apache.org
Subject [1/3] GIRAPH-825
Date Wed, 30 Apr 2014 07:53:45 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk 614df7f9f -> f732f300f


http://git-wip-us.apache.org/repos/asf/giraph/blob/bd4127b7/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
index a04b703..d3c392e 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
@@ -18,6 +18,11 @@
 
 package org.apache.giraph.comm.messages;
 
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.DoubleSumMessageCombiner;
 import org.apache.giraph.comm.messages.primitives.LongByteArrayMessageStore;
@@ -33,6 +38,7 @@ import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -42,14 +48,10 @@ import org.mockito.stubbing.Answer;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
-import junit.framework.Assert;
-
-import java.io.IOException;
-import java.util.Iterator;
-
 public class TestLongDoublePrimitiveMessageStores {
   private static final int NUM_PARTITIONS = 2;
-  private static CentralizedServiceWorker<LongWritable, ?, ?> service;
+  private static CentralizedServiceWorker<LongWritable, Writable, Writable>
+    service;
 
   @Before
   public void prepare() throws IOException {
@@ -83,8 +85,9 @@ public class TestLongDoublePrimitiveMessageStores {
     }
   }
 
-  private static ImmutableClassesGiraphConfiguration<LongWritable, ?, ?>
-  createLongDoubleConf() {
+  private static ImmutableClassesGiraphConfiguration<LongWritable, Writable,
+    Writable> createLongDoubleConf() {
+
     GiraphConfiguration initConf = new GiraphConfiguration();
     initConf.setComputationClass(LongDoubleNoOpComputation.class);
     return new ImmutableClassesGiraphConfiguration(initConf);

http://git-wip-us.apache.org/repos/asf/giraph/blob/bd4127b7/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index 08f4544..7605fb5 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -18,40 +18,66 @@
 
 package org.apache.giraph.partition;
 
+import static org.apache.giraph.conf.GiraphConstants.MAX_PARTITIONS_IN_MEMORY;
+import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY;
+import static org.apache.giraph.conf.GiraphConstants.USER_PARTITION_COUNT;
+import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_GRAPH;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.commons.io.FileUtils;
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.BasicComputation;
 import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
 import org.apache.giraph.utils.NoOpComputation;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import com.google.common.collect.Iterables;
 import com.google.common.io.Files;
 
-import java.io.File;
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
 /**
  * Test case for partition stores.
  */
+@SuppressWarnings("unchecked")
 public class TestPartitionStores {
   private ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
       NullWritable> conf;
   private Mapper<?, ?, ?, ?>.Context context;
 
+  /* these static variables are used for the multithreaded tests */
+  private static final int NUM_OF_VERTEXES_PER_THREAD = 10;
+  private static final int NUM_OF_EDGES_PER_VERTEX = 5;
+  private static final int NUM_OF_THREADS = 10;
+  private static final int NUM_OF_PARTITIONS = 3;
+
   public static class MyComputation extends NoOpComputation<IntWritable,
       IntWritable, NullWritable, IntWritable> { }
 
@@ -74,7 +100,7 @@ public class TestPartitionStores {
     configuration.setComputationClass(MyComputation.class);
     conf = new ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
         NullWritable>(configuration);
-    context = mock(Mapper.Context.class);
+    context = Mockito.mock(Mapper.Context.class);
   }
 
   @Test
@@ -125,17 +151,24 @@ public class TestPartitionStores {
   }
   
   @Test
-  public void testDiskBackedPartitionStoreWithByteArrayPartition() throws IOException {
+  public void testDiskBackedPartitionStoreWithByteArrayPartition()
+    throws IOException {
+
     File directory = Files.createTempDir();
     GiraphConstants.PARTITIONS_DIRECTORY.set(
         conf, new File(directory, "giraph_partitions").toString());
     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
     conf.setPartitionClass(ByteArrayPartition.class);
-    
+
+    CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
+      serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
+    Mockito.when(serviceWorker.getSuperstep()).thenReturn(
+      BspService.INPUT_SUPERSTEP);
+
     PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore =
         new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>(
-            conf, context);
+            conf, context, serviceWorker);
     testReadWrite(partitionStore, conf);
     partitionStore.shutdown();
     FileUtils.deleteDirectory(directory);
@@ -149,20 +182,174 @@ public class TestPartitionStores {
     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
 
+    CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
+    serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
+
+    Mockito.when(serviceWorker.getSuperstep()).thenReturn(
+      BspService.INPUT_SUPERSTEP);
+
     PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore =
         new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>(
-            conf, context);
+            conf, context, serviceWorker);
     testReadWrite(partitionStore, conf);
     partitionStore.shutdown();
 
     GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 2);
     partitionStore = new DiskBackedPartitionStore<IntWritable,
-            IntWritable, NullWritable>(conf, context);
+            IntWritable, NullWritable>(conf, context, serviceWorker);
     testReadWrite(partitionStore, conf);
     partitionStore.shutdown();
     FileUtils.deleteDirectory(directory);
   }
 
+  @Test
+  public void testDiskBackedPartitionStoreWithByteArrayComputation()
+    throws Exception {
+
+    Iterable<String> results;
+    String[] graph =
+    {
+      "[1,0,[]]", "[2,0,[]]", "[3,0,[]]", "[4,0,[]]", "[5,0,[]]",
+      "[6,0,[]]", "[7,0,[]]", "[8,0,[]]", "[9,0,[]]", "[10,0,[]]"
+    };
+    String[] expected =
+    {
+      "1\t0", "2\t0", "3\t0", "4\t0", "5\t0",
+      "6\t0", "7\t0", "8\t0", "9\t0", "10\t0"
+    };
+
+    USE_OUT_OF_CORE_GRAPH.set(conf, true);
+    MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
+    USER_PARTITION_COUNT.set(conf, 10);
+
+    File directory = Files.createTempDir();
+    PARTITIONS_DIRECTORY.set(conf,
+      new File(directory, "giraph_partitions").toString());
+
+    conf.setPartitionClass(ByteArrayPartition.class);
+    conf.setComputationClass(EmptyComputation.class);
+    conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+
+    results = InternalVertexRunner.run(conf, graph);
+    checkResults(results, expected);
+    FileUtils.deleteDirectory(directory);
+  }
+
+  @Test
+  public void testDiskBackedPartitionStoreMT() throws Exception {
+    GiraphConstants.STATIC_GRAPH.set(conf, false);
+    testMultiThreaded();
+  }
+
+  /*
+  @Test
+  public void testDiskBackedPartitionStoreMTStatic() throws Exception {
+    GiraphConstants.STATIC_GRAPH.set(conf, true);
+    testMultiThreaded();
+  }
+  */
+
+  private void testMultiThreaded() throws Exception {
+    final AtomicInteger vertexCounter = new AtomicInteger(0);
+    ExecutorService pool = Executors.newFixedThreadPool(NUM_OF_THREADS);
+    ExecutorCompletionService<Boolean> executor =
+      new ExecutorCompletionService<Boolean>(pool);
+
+    File directory = Files.createTempDir();
+    GiraphConstants.PARTITIONS_DIRECTORY.set(
+        conf, new File(directory, "giraph_partitions").toString());
+    GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
+    GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
+
+    CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
+    serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
+
+    Mockito.when(serviceWorker.getSuperstep()).thenReturn(
+      BspService.INPUT_SUPERSTEP);
+
+    PartitionStore<IntWritable, IntWritable, NullWritable> store =
+        new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>(
+            conf, context, serviceWorker);
+
+    // Create a new Graph in memory using multiple threads
+    for (int i = 0; i < NUM_OF_THREADS; ++i) {
+      int partitionId = i % NUM_OF_PARTITIONS;
+      Worker worker =
+        new Worker(vertexCounter, store, partitionId, conf);
+      executor.submit(worker, new Boolean(true));
+    }
+    for (int i = 0; i < NUM_OF_THREADS; ++i)
+      executor.take();
+    pool.shutdownNow();
+
+    // Check the number of vertices
+    int totalVertexes = 0;
+    int totalEdges = 0;
+    Partition<IntWritable, IntWritable, NullWritable> partition;
+    for (int i = 0; i < NUM_OF_PARTITIONS; ++i) {
+      partition = store.getOrCreatePartition(i);
+      totalVertexes += partition.getVertexCount();
+      totalEdges += partition.getEdgeCount();
+      store.putPartition(partition);
+    }
+    assert vertexCounter.get() == NUM_OF_THREADS * NUM_OF_VERTEXES_PER_THREAD;
+    assert totalVertexes == NUM_OF_THREADS * NUM_OF_VERTEXES_PER_THREAD;
+    assert totalEdges == totalVertexes * NUM_OF_EDGES_PER_VERTEX;
+
+    // Check the content of the vertices
+    int expected = 0;
+    for (int i = 0; i < NUM_OF_VERTEXES_PER_THREAD * NUM_OF_VERTEXES_PER_THREAD; ++i)
{
+      expected += i;
+    }
+    int totalValues = 0;
+    for (int i = 0; i < NUM_OF_PARTITIONS; ++i) {
+      partition = store.getOrCreatePartition(i);
+      Iterator<Vertex<IntWritable, IntWritable, NullWritable>> vertexes = 
+        partition.iterator();
+
+      while (vertexes.hasNext()) {
+        Vertex<IntWritable, IntWritable, NullWritable> v = vertexes.next();
+        totalValues += v.getId().get();
+      }
+      store.putPartition(partition);
+    }
+    assert totalValues == expected;
+    
+    store.shutdown();
+  }
+
+  @Test
+  public void testDiskBackedPartitionStoreComputation() throws Exception {
+    Iterable<String> results;
+    String[] graph =
+    {
+      "[1,0,[]]", "[2,0,[]]", "[3,0,[]]", "[4,0,[]]", "[5,0,[]]",
+      "[6,0,[]]", "[7,0,[]]", "[8,0,[]]", "[9,0,[]]", "[10,0,[]]"
+    };
+    String[] expected =
+    {
+      "1\t0", "2\t0", "3\t0", "4\t0", "5\t0",
+      "6\t0", "7\t0", "8\t0", "9\t0", "10\t0"
+    };
+
+    USE_OUT_OF_CORE_GRAPH.set(conf, true);
+    MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
+    USER_PARTITION_COUNT.set(conf, 10);
+
+    File directory = Files.createTempDir();
+    PARTITIONS_DIRECTORY.set(conf,
+      new File(directory, "giraph_partitions").toString());
+
+    conf.setComputationClass(EmptyComputation.class);
+    conf.setVertexInputFormatClass(JsonLongDoubleFloatDoubleVertexInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+
+    results = InternalVertexRunner.run(conf, graph);
+    checkResults(results, expected);
+    FileUtils.deleteDirectory(directory);
+  }
+
   /**
    * Test reading/writing to/from a partition store
    *
@@ -238,17 +425,58 @@ public class TestPartitionStores {
     partitionStore.deletePartition(2);
     assertEquals(2, partitionStore.getNumPartitions());
   }
-  
+
+  /**
+   * Internal checker to verify the correctness of the tests.
+   * @param results   the actual results obtaind
+   * @param expected  expected results
+   */
+  private void checkResults(Iterable<String> results, String[] expected) {
+    Iterator<String> result = results.iterator();
+
+    assert results != null;
+
+    while(result.hasNext()) {
+      String  resultStr = result.next();
+      boolean found = false;
+
+      for (int j = 0; j < expected.length; ++j) {
+        if (expected[j].equals(resultStr)) {
+          found = true;
+        }
+      }
+
+      assert found;
+    }
+  }
+
+  /**
+   * Test compute method that sends each edge a notification of its parents.
+   * The test set only has a 1-1 parent-to-child ratio for this unit test.
+   */
+  public static class EmptyComputation
+    extends BasicComputation<LongWritable, DoubleWritable, FloatWritable,
+      LongWritable> {
+
+    @Override
+    public void compute(
+      Vertex<LongWritable, DoubleWritable,FloatWritable> vertex,
+      Iterable<LongWritable> messages) throws IOException {
+
+      vertex.voteToHalt();
+    }
+  }
+
   @Test
   public void testEdgeCombineWithSimplePartition() throws IOException {
     testEdgeCombine(SimplePartition.class);
   }
-  
+ 
   @Test
   public void testEdgeCombineWithByteArrayPartition() throws IOException {
     testEdgeCombine(ByteArrayPartition.class);
   }
-  
+ 
   private void testEdgeCombine(Class<? extends Partition> partitionClass)
       throws IOException {
     Vertex<IntWritable, IntWritable, NullWritable> v1 = conf.createVertex();
@@ -286,4 +514,46 @@ public class TestPartitionStores {
     assertEquals(new IntWritable(1), v1.getValue());
     assertEquals(2, v1.getNumEdges());
   }
+
+  private class Worker implements Runnable {
+
+    private final AtomicInteger vertexCounter;
+    private final PartitionStore<IntWritable, IntWritable, NullWritable>
+      partitionStore;
+    private final int partitionId;
+    private final ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
+            NullWritable> conf;
+
+    public Worker(AtomicInteger vertexCounter,
+        PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore,
+        int partitionId,
+        ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
+          NullWritable> conf) {
+
+      this.vertexCounter = vertexCounter;
+      this.partitionStore = partitionStore;
+      this.partitionId = partitionId;
+      this.conf = conf;
+    }
+
+    public void run() {
+      for (int i = 0; i < NUM_OF_VERTEXES_PER_THREAD; ++i) {
+        int id = vertexCounter.getAndIncrement();
+        Vertex<IntWritable, IntWritable, NullWritable> v = conf.createVertex();
+        v.initialize(new IntWritable(id), new IntWritable(id));
+
+        Partition<IntWritable, IntWritable, NullWritable> partition =
+          partitionStore.getOrCreatePartition(partitionId);
+
+        Random rand = new Random(id);
+        for (int j = 0; j < NUM_OF_EDGES_PER_VERTEX; ++j) {
+          int dest = rand.nextInt(id + 1);
+          v.addEdge(EdgeFactory.create(new IntWritable(dest)));
+        }
+
+        partition.putVertex(v);
+        partitionStore.putPartition(partition);
+      }
+    }
+  }
 }


Mime
View raw message