giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edu...@apache.org
Subject [1/5] git commit: updated refs/heads/trunk to fafecee
Date Tue, 15 Mar 2016 17:40:33 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk c6af3ed8a -> fafecee71


http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 1062479..37876d4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -575,7 +575,7 @@ else[HADOOP_NON_SECURE]*/
 
     if (getConfiguration().hasEdgeInputFormat()) {
       // Move edges from temporary storage to their source vertices.
-      getServerData().getPartitionStore().moveEdgesToVertices();
+      getServerData().getEdgeStore().moveEdgesToVertices();
     }
 
     // Generate the partition stats for the input superstep and process
@@ -783,7 +783,7 @@ else[HADOOP_NON_SECURE]*/
     globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor);
 
     MessageStore<I, Writable> incomingMessageStore =
-        getServerData().getPartitionStore().getIncomingMessageStore();
+        getServerData().getIncomingMessageStore();
     if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
       ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
     }
@@ -1010,13 +1010,16 @@ else[HADOOP_NON_SECURE]*/
             long nextPrintMsecs = System.currentTimeMillis() + 15000;
             int partitionIndex = 0;
             int numPartitions = getPartitionStore().getNumPartitions();
+            LOG.info("Write thread started!");
             while (true) {
               Partition<I, V, E> partition =
                   getPartitionStore().getNextPartition();
+              LOG.info("partition is : " + partition);
               if (partition == null) {
                 break;
               }
 
+              LOG.info("start to write a partition");
               long verticesWritten = 0;
               for (Vertex<I, V, E> vertex : partition) {
                 vertexWriter.writeVertex(vertex);
@@ -1033,6 +1036,7 @@ else[HADOOP_NON_SECURE]*/
                   nextPrintMsecs = System.currentTimeMillis() + 15000;
                   nextPrintVertices = verticesWritten + 250000;
                 }
+                LOG.info("done writing vertices");
 
                 if (verticesWritten >= nextUpdateProgressVertices) {
                   WorkerProgress.get().addVerticesStored(
@@ -1288,10 +1292,12 @@ else[HADOOP_NON_SECURE]*/
     workerContext.write(checkpointOutputStream);
     getContext().progress();
 
+    // TODO: checkpointing messages along with vertices to avoid multiple loads
+    //       of a partition when out-of-core is enabled.
     for (Integer partitionId : getPartitionStore().getPartitionIds()) {
       // write messages
       checkpointOutputStream.writeInt(partitionId);
-      getServerData().getPartitionStore().getCurrentMessageStore()
+      getServerData().getCurrentMessageStore()
           .writePartition(checkpointOutputStream, partitionId);
       getContext().progress();
 
@@ -1539,9 +1545,11 @@ else[HADOOP_NON_SECURE]*/
       getConfiguration().updateSuperstepClasses(superstepClasses);
       getServerData().resetMessageStores();
 
+      // TODO: checkpointing messages along with vertices to avoid multiple
+      //       loads of a partition when out-of-core is enabled.
       for (int i = 0; i < partitions; i++) {
         int partitionId = checkpointStream.readInt();
-        getServerData().getPartitionStore().getCurrentMessageStore()
+        getServerData().getCurrentMessageStore()
             .readFieldsForPartition(checkpointStream, partitionId);
       }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index 2785217..c88aac7 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -106,15 +106,14 @@ public class RequestFailureTest {
   private void checkResult(int numRequests) throws IOException {
     // Check the output
     Iterable<IntWritable> vertices =
-        serverData.getPartitionStore().getIncomingMessageStore()
-            .getPartitionDestinationVertices(0);
+        serverData.getIncomingMessageStore().getPartitionDestinationVertices(0);
     int keySum = 0;
     int messageSum = 0;
     for (IntWritable vertexId : vertices) {
       keySum += vertexId.get();
       Iterable<IntWritable> messages =
-          serverData.getPartitionStore().<IntWritable>getIncomingMessageStore()
-              .getVertexMessages(vertexId);
+          serverData.<IntWritable>getIncomingMessageStore().getVertexMessages(
+              vertexId);
       synchronized (messages) {
         for (IntWritable message : messages) {
           messageSum += message.get();

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index 2688da1..0462770 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -178,15 +178,14 @@ public class RequestTest {
 
     // Check the output
     Iterable<IntWritable> vertices =
-        serverData.getPartitionStore().getIncomingMessageStore()
-            .getPartitionDestinationVertices(0);
+        serverData.getIncomingMessageStore().getPartitionDestinationVertices(0);
     int keySum = 0;
     int messageSum = 0;
     for (IntWritable vertexId : vertices) {
       keySum += vertexId.get();
       Iterable<IntWritable> messages =
-          serverData.getPartitionStore().<IntWritable>getIncomingMessageStore()
-              .getVertexMessages(vertexId);
+          serverData.<IntWritable>getIncomingMessageStore().getVertexMessages(
+              vertexId);
       synchronized (messages) {
         for (IntWritable message : messages) {
           messageSum += message.get();
@@ -224,15 +223,14 @@ public class RequestTest {
 
     // Check the output
     Iterable<IntWritable> vertices =
-        serverData.getPartitionStore().getIncomingMessageStore()
-            .getPartitionDestinationVertices(0);
+        serverData.getIncomingMessageStore().getPartitionDestinationVertices(0);
     int keySum = 0;
     int messageSum = 0;
     for (IntWritable vertexId : vertices) {
       keySum += vertexId.get();
       Iterable<IntWritable> messages =
-          serverData.getPartitionStore().<IntWritable>getIncomingMessageStore()
-              .getVertexMessages(vertexId);
+          serverData.<IntWritable>getIncomingMessageStore().getVertexMessages(
+              vertexId);
       synchronized (messages) {
         for (IntWritable message : messages) {
           messageSum += message.get();

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index 249a337..7893940 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -23,6 +23,7 @@ import com.google.common.io.Files;
 import org.apache.commons.io.FileUtils;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -32,7 +33,7 @@ import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
 import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat;
 import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat;
-import org.apache.giraph.ooc.DiskBackedPartitionStore;
+import org.apache.giraph.ooc.data.DiskBackedPartitionStore;
 import org.apache.giraph.utils.InternalVertexRunner;
 import org.apache.giraph.utils.NoOpComputation;
 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
@@ -50,7 +51,6 @@ import org.mockito.Mockito;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ExecutorCompletionService;
@@ -98,18 +98,14 @@ public class TestPartitionStores {
   public void setUp() {
     GiraphConfiguration configuration = new GiraphConfiguration();
     configuration.setComputationClass(MyComputation.class);
-    conf = new ImmutableClassesGiraphConfiguration<IntWritable, IntWritable,
-        NullWritable>(configuration);
+    conf = new ImmutableClassesGiraphConfiguration<>(configuration);
     context = Mockito.mock(Mapper.Context.class);
   }
 
   @Test
   public void testSimplePartitionStore() {
-    CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
-    serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
     PartitionStore<IntWritable, IntWritable, NullWritable>
-    partitionStore = new SimplePartitionStore<IntWritable, IntWritable,
-                NullWritable>(conf, context, serviceWorker);
+    partitionStore = new SimplePartitionStore<>(conf, context);
     testReadWrite(partitionStore, conf);
     partitionStore.shutdown();
   }
@@ -166,10 +162,15 @@ public class TestPartitionStores {
       serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
         BspService.INPUT_SUPERSTEP);
-
-    PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore =
-        new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>(
-            conf, context, serviceWorker);
+    ServerData<IntWritable, IntWritable, NullWritable>
+        serverData = new ServerData<>(serviceWorker, conf, context);
+    Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
+
+    DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
+        partitionStore =
+        (DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>)
+            serverData.getPartitionStore();
+    partitionStore.initialize();
     testReadWrite(partitionStore, conf);
     partitionStore.shutdown();
     FileUtils.deleteDirectory(directory);
@@ -185,16 +186,19 @@ public class TestPartitionStores {
 
     CentralizedServiceWorker<IntWritable, IntWritable, NullWritable>
     serviceWorker = Mockito.mock(CentralizedServiceWorker.class);
-
     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
         BspService.INPUT_SUPERSTEP);
-
-    PartitionStore<IntWritable, IntWritable, NullWritable> partitionStore =
-        new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>(
-            conf, context, serviceWorker);
+    ServerData<IntWritable, IntWritable, NullWritable>
+        serverData = new ServerData<>(serviceWorker, conf, context);
+    Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
+
+    DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
+        partitionStore =
+        (DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>)
+            serverData.getPartitionStore();
+    partitionStore.initialize();
     testReadWrite(partitionStore, conf);
     partitionStore.shutdown();
-
     FileUtils.deleteDirectory(directory);
   }
 
@@ -275,18 +279,18 @@ public class TestPartitionStores {
     GiraphConstants.STATIC_GRAPH.set(conf, true);
     testMultiThreaded();
   }
-
+/*
   @Test
   public void testDiskBackedPartitionStoreAdaptiveOOC() throws Exception {
     GiraphConstants.STATIC_GRAPH.set(conf, true);
     testMultiThreaded();
   }
-
+*/
   private void testMultiThreaded() throws Exception {
     final AtomicInteger vertexCounter = new AtomicInteger(0);
     ExecutorService pool = Executors.newFixedThreadPool(NUM_OF_THREADS);
     ExecutorCompletionService<Boolean> executor =
-      new ExecutorCompletionService<Boolean>(pool);
+      new ExecutorCompletionService<>(pool);
 
     File directory = Files.createTempDir();
     GiraphConstants.PARTITIONS_DIRECTORY.set(
@@ -298,21 +302,25 @@ public class TestPartitionStores {
 
     Mockito.when(serviceWorker.getSuperstep()).thenReturn(
         BspService.INPUT_SUPERSTEP);
-
-    PartitionStore<IntWritable, IntWritable, NullWritable> store =
-        new DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>(
-            conf, context, serviceWorker);
+    ServerData<IntWritable, IntWritable, NullWritable>
+        serverData = new ServerData<>(serviceWorker, conf, context);
+    Mockito.when(serviceWorker.getServerData()).thenReturn(serverData);
+
+    DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>
+        store =
+        (DiskBackedPartitionStore<IntWritable, IntWritable, NullWritable>)
+            serverData.getPartitionStore();
     store.initialize();
 
     // Create a new Graph in memory using multiple threads
     for (int i = 0; i < NUM_OF_THREADS; ++i) {
-      List<Integer> partitionIds = new ArrayList<Integer>();
+      List<Integer> partitionIds = new ArrayList<>();
       for (int id = i; id < NUM_OF_PARTITIONS; id += NUM_OF_THREADS) {
         partitionIds.add(id);
       }
       Worker worker =
         new Worker(vertexCounter, store, partitionIds, conf);
-      executor.submit(worker, new Boolean(true));
+      executor.submit(worker, true);
     }
     for (int i = 0; i < NUM_OF_THREADS; ++i)
       executor.take();
@@ -341,11 +349,8 @@ public class TestPartitionStores {
     for (int i = 0; i < NUM_OF_PARTITIONS; ++i) {
       partition = store.getNextPartition();
       assert partition != null;
-      Iterator<Vertex<IntWritable, IntWritable, NullWritable>> vertexes =
-        partition.iterator();
 
-      while (vertexes.hasNext()) {
-        Vertex<IntWritable, IntWritable, NullWritable> v = vertexes.next();
+      for (Vertex<IntWritable, IntWritable, NullWritable> v : partition) {
         totalValues += v.getId().get();
       }
       store.putPartition(partition);
@@ -358,7 +363,6 @@ public class TestPartitionStores {
   private Partition<IntWritable, IntWritable, NullWritable>
   getPartition(PartitionStore<IntWritable, IntWritable,
       NullWritable> partitionStore, int partitionId) {
-    partitionStore.startIteration();
     Partition p;
     Partition result = null;
     while ((p = partitionStore.getNextPartition()) != null) {
@@ -403,8 +407,11 @@ public class TestPartitionStores {
     partitionStore.addPartition(createPartition(conf, 3, v5));
     partitionStore.addPartition(createPartition(conf, 4, v7));
 
+    partitionStore.startIteration();
     getPartition(partitionStore, 1);
+    partitionStore.startIteration();
     getPartition(partitionStore, 2);
+    partitionStore.startIteration();
     partitionStore.removePartition(3);
     getPartition(partitionStore, 4);
 
@@ -435,16 +442,12 @@ public class TestPartitionStores {
    * @param expected  expected results
    */
   private void checkResults(Iterable<String> results, String[] expected) {
-    Iterator<String> result = results.iterator();
-
-    assert results != null;
 
-    while(result.hasNext()) {
-      String  resultStr = result.next();
+    for (String str : results) {
       boolean found = false;
 
-      for (int j = 0; j < expected.length; ++j) {
-        if (expected[j].equals(resultStr)) {
+      for (String expectedStr : expected) {
+        if (expectedStr.equals(str)) {
           found = true;
         }
       }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
index ad9ba6f..6fdfc75 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestOutOfCore.java
@@ -18,23 +18,14 @@
 
 package org.apache.giraph;
 
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.combiner.DoubleSumMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.examples.GeneratedVertexReader;
 import org.apache.giraph.examples.SimplePageRankComputation;
 import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexInputFormat;
 import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexOutputFormat;
-import org.apache.giraph.graph.BasicComputation;
-import org.apache.giraph.graph.Vertex;
+
 import org.apache.giraph.job.GiraphJob;
-import org.apache.giraph.ooc.CheckMemoryCallable;
-import org.apache.giraph.ooc.DiskBackedPartitionStore;
-import org.apache.giraph.ooc.MemoryEstimator;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -47,40 +38,14 @@ import static org.junit.Assert.assertTrue;
  */
 public class TestOutOfCore extends BspCase {
   final static int NUM_PARTITIONS = 32;
-  final static int MAX_NUM_PARTITIONS_IN_MEMORY = 16;
-  final static int FAIR_NUM_PARTITIONS_IN_MEMORY = 12;
+  final static int NUM_PARTITIONS_IN_MEMORY = 16;
 
   public TestOutOfCore() {
       super(TestOutOfCore.class.getName());
   }
 
-  public static class TestMemoryEstimator implements MemoryEstimator {
-    private DiskBackedPartitionStore partitionStore;
-    @Override
-    public void initialize(CentralizedServiceWorker serviceWorker) {
-      partitionStore =
-          (DiskBackedPartitionStore) serviceWorker.getPartitionStore();
-    }
-
-    @Override
-    public double freeMemoryMB() {
-      int numPartitions = partitionStore.getNumPartitionInMemory();
-      if (numPartitions > MAX_NUM_PARTITIONS_IN_MEMORY) {
-        return 1;
-      } else if (numPartitions > FAIR_NUM_PARTITIONS_IN_MEMORY) {
-        return 10;
-      } else {
-        return 40;
-      }
-    }
-
-    @Override
-    public double maxMemoryMB() {
-      return 100;
-    }
-  }
   /**
-   * Run a job that tests the adaptive out-of-core mechanism
+   * Run a job that tests the fixed out-of-core mechanism
    *
    * @throws IOException
    * @throws ClassNotFoundException
@@ -99,13 +64,12 @@ public class TestOutOfCore extends BspCase {
         SimplePageRankComputation.SimplePageRankMasterCompute.class);
     GiraphConstants.USER_PARTITION_COUNT.set(conf, NUM_PARTITIONS);
     GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
-    GiraphConstants.OUT_OF_CORE_MEM_ESTIMATOR
-        .set(conf, TestMemoryEstimator.class);
-    CheckMemoryCallable.CHECK_MEMORY_INTERVAL.set(conf, 5);
+    GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, NUM_PARTITIONS_IN_MEMORY);
     GiraphConstants.NUM_COMPUTE_THREADS.set(conf, 8);
     GiraphConstants.NUM_INPUT_THREADS.set(conf, 8);
     GiraphConstants.NUM_OOC_THREADS.set(conf, 4);
     GiraphConstants.NUM_OUTPUT_THREADS.set(conf, 8);
+    GiraphConstants.PARTITIONS_DIRECTORY.set(conf, "disk0,disk1,disk2");
     GiraphJob job = prepareJob(getCallingMethodName(), conf,
         getTempPath(getCallingMethodName()));
     // Overwrite the number of vertices set in BspCase


Mime
View raw message