giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject svn commit: r1408863 - in /giraph/trunk/giraph/src: main/java/org/apache/giraph/ main/java/org/apache/giraph/benchmark/ main/java/org/apache/giraph/comm/messages/ main/java/org/apache/giraph/comm/netty/ main/java/org/apache/giraph/examples/ main/java/o...
Date Tue, 13 Nov 2012 18:01:51 GMT
Author: maja
Date: Tue Nov 13 18:01:47 2012
New Revision: 1408863

URL: http://svn.apache.org/viewvc?rev=1408863&view=rev
Log:
GIRAPH-414: Create BinaryCombiner and specialized message store for it

Added:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/CollectionOfMessagesPerVertexStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Combiner.java
Removed:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/VertexCombiner.java
Modified:
    giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphRunner.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/DoubleSumCombiner.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspUtils.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java
    giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/TestVertexTypes.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/TestMessageStores.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
    giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphConfiguration.java Tue Nov 13 18:01:47 2012
@@ -19,10 +19,10 @@
 package org.apache.giraph;
 
 import org.apache.giraph.graph.AggregatorWriter;
+import org.apache.giraph.graph.Combiner;
 import org.apache.giraph.graph.EdgeInputFormat;
 import org.apache.giraph.graph.MasterCompute;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexCombiner;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexOutputFormat;
 import org.apache.giraph.graph.VertexResolver;
@@ -697,10 +697,10 @@ public class GiraphConfiguration extends
    * @param vertexCombinerClass Determines how vertex messages are combined
    */
   public final void setVertexCombinerClass(
-      Class<? extends VertexCombiner> vertexCombinerClass) {
+      Class<? extends Combiner> vertexCombinerClass) {
     setClass(VERTEX_COMBINER_CLASS,
         vertexCombinerClass,
-        VertexCombiner.class);
+        Combiner.class);
   }
 
   /**

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphRunner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphRunner.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphRunner.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/GiraphRunner.java Tue Nov 13 18:01:47 2012
@@ -24,11 +24,11 @@ import org.apache.commons.cli.HelpFormat
 import org.apache.commons.cli.Options;
 import org.apache.giraph.examples.Algorithm;
 import org.apache.giraph.graph.AggregatorWriter;
+import org.apache.giraph.graph.Combiner;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.GiraphTypeValidator;
 import org.apache.giraph.graph.MasterCompute;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexCombiner;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexOutputFormat;
 import org.apache.giraph.graph.WorkerContext;
@@ -87,7 +87,7 @@ public class GiraphRunner implements Too
     options.addOption("of", "outputFormat", true, "Graph outputformat");
     options.addOption("ip", "inputPath", true, "Graph input path");
     options.addOption("op", "outputPath", true, "Graph output path");
-    options.addOption("c", "combiner", true, "VertexCombiner class");
+    options.addOption("c", "combiner", true, "Combiner class");
     options.addOption("wc", "workerContext", true, "WorkerContext class");
     options.addOption("aw", "aggregatorWriter", true, "AggregatorWriter class");
     options.addOption("mc", "masterCompute", true, "MasterCompute class");
@@ -202,7 +202,7 @@ public class GiraphRunner implements Too
 
     if (cmd.hasOption("c")) {
       giraphConfiguration.setVertexCombinerClass(
-          (Class<? extends VertexCombiner>)
+          (Class<? extends Combiner>)
               Class.forName(cmd.getOptionValue("c")));
     }
 

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/ImmutableClassesGiraphConfiguration.java Tue Nov 13 18:01:47 2012
@@ -19,6 +19,7 @@
 package org.apache.giraph;
 
 import org.apache.giraph.graph.AggregatorWriter;
+import org.apache.giraph.graph.Combiner;
 import org.apache.giraph.graph.DefaultMasterCompute;
 import org.apache.giraph.graph.DefaultWorkerContext;
 import org.apache.giraph.graph.EdgeInputFormat;
@@ -26,7 +27,6 @@ import org.apache.giraph.graph.GraphStat
 import org.apache.giraph.graph.MasterCompute;
 import org.apache.giraph.graph.TextAggregatorWriter;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexCombiner;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexOutputFormat;
 import org.apache.giraph.graph.VertexResolver;
@@ -86,8 +86,9 @@ public class ImmutableClassesGiraphConfi
 
   /** Aggregator writer class - cached for fast access */
   private final Class<? extends AggregatorWriter> aggregatorWriterClass;
-  /** Vertex combiner class - cached for fast access */
-  private final Class<? extends VertexCombiner<I, M>> vertexCombinerClass;
+  /** Combiner class - cached for fast access */
+  private final Class<? extends Combiner<I, M>> combinerClass;
+
   /** Vertex resolver class - cached for fast access */
   private final Class<? extends VertexResolver<I, V, E, M>>
   vertexResolverClass;
@@ -136,9 +137,8 @@ public class ImmutableClassesGiraphConfi
 
     aggregatorWriterClass = conf.getClass(AGGREGATOR_WRITER_CLASS,
         TextAggregatorWriter.class, AggregatorWriter.class);
-    vertexCombinerClass = (Class<? extends VertexCombiner<I, M>>)
-        conf.getClass(VERTEX_COMBINER_CLASS,
-        null, VertexCombiner.class);
+    combinerClass = (Class<? extends Combiner<I, M>>)
+        conf.getClass(VERTEX_COMBINER_CLASS, null, Combiner.class);
     vertexResolverClass = (Class<? extends VertexResolver<I, V, E, M>>)
         conf.getClass(VERTEX_RESOLVER_CLASS,
         VertexResolver.class, VertexResolver.class);
@@ -275,22 +275,22 @@ public class ImmutableClassesGiraphConfi
   }
 
   /**
-   * Get the user's subclassed {@link org.apache.giraph.graph.VertexCombiner}.
+   * Create a user combiner class
    *
-   * @return User's vertex combiner class
+   * @return Instantiated user combiner class
    */
-  public Class<? extends VertexCombiner<I, M>> getVertexCombinerClass() {
-    return vertexCombinerClass;
+  @SuppressWarnings("rawtypes")
+  public Combiner<I, M> createCombiner() {
+    return ReflectionUtils.newInstance(combinerClass, this);
   }
 
   /**
-   * Create a user vertex combiner class
+   * Check if user set a combiner
    *
-   * @return Instantiated user vertex combiner class
+   * @return True iff user set a combiner class
    */
-  @SuppressWarnings("rawtypes")
-  public VertexCombiner<I, M> createVertexCombiner() {
-    return ReflectionUtils.newInstance(vertexCombinerClass, this);
+  public boolean useCombiner() {
+    return combinerClass != null;
   }
 
   /**

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java Tue Nov 13 18:01:47 2012
@@ -80,10 +80,10 @@ public class PageRankBenchmark implement
         "name",
         true,
         "Name of the job");
-    options.addOption("nc",
-        "noCombiner",
-        false,
-        "Don't use a combiner");
+    options.addOption("t",
+        "combinerType",
+        true,
+        "Combiner type (0 for no combiner, 1 for DoubleSumCombiner (default)");
 
     HelpFormatter formatter = new HelpFormatter();
     if (args.length == 0) {
@@ -131,8 +131,13 @@ public class PageRankBenchmark implement
     }
     LOG.info("Using class " +
         job.getConfiguration().get(GiraphConfiguration.VERTEX_CLASS));
-    if (!cmd.hasOption("nc")) {
-      job.getConfiguration().setVertexCombinerClass(DoubleSumCombiner.class);
+    if (!cmd.hasOption('t') ||
+        (Integer.parseInt(cmd.getOptionValue('t')) == 2)) {
+      job.getConfiguration().setVertexCombinerClass(
+          DoubleSumCombiner.class);
+    } else if (Integer.parseInt(cmd.getOptionValue('t')) == 1) {
+      job.getConfiguration().setVertexCombinerClass(
+          DoubleSumCombiner.class);
     }
     job.getConfiguration().setVertexInputFormatClass(
         PseudoRandomVertexInputFormat.class);

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/CollectionOfMessagesPerVertexStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/CollectionOfMessagesPerVertexStore.java?rev=1408863&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/CollectionOfMessagesPerVertexStore.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/CollectionOfMessagesPerVertexStore.java Tue Nov 13 18:01:47 2012
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.utils.CollectionUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Lists;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Implementation of {@link SimpleMessageStore} where we have a collection
+ * of messages per vertex.
+ * Used when there is no combiner provided.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class CollectionOfMessagesPerVertexStore<I extends WritableComparable,
+    M extends Writable> extends SimpleMessageStore<I, M, Collection<M>> {
+
+  /**
+   * Constructor
+   *
+   * @param service  Service worker
+   * @param config   Hadoop configuration
+   */
+  CollectionOfMessagesPerVertexStore(
+      CentralizedServiceWorker<I, ?, ?, M> service,
+      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+    super(service, config);
+  }
+
+  @Override
+  protected void addVertexMessagesToPartition(I vertexId,
+      Collection<M> messages,
+      ConcurrentMap<I, Collection<M>> partitionMap) throws IOException {
+    CollectionUtils.addConcurrent(vertexId, messages, partitionMap);
+  }
+
+  @Override
+  protected void addVertexMessageToPartition(I vertexId, M message,
+      ConcurrentMap<I, Collection<M>> partitionMap) throws IOException {
+    Collection<M> currentMessages = partitionMap.get(vertexId);
+    if (currentMessages == null) {
+      Collection<M> newMessages = Lists.newArrayList(message);
+      currentMessages = partitionMap.putIfAbsent(vertexId, newMessages);
+    }
+    // if vertex messages existed before, or putIfAbsent didn't put new list
+    if (currentMessages != null) {
+      synchronized (currentMessages) {
+        currentMessages.add(message);
+      }
+    }
+  }
+
+  @Override
+  protected Collection<M> getMessagesAsCollection(Collection<M> messages) {
+    return messages;
+  }
+
+  @Override
+  protected int getNumberOfMessagesIn(
+      ConcurrentMap<I, Collection<M>> partitionMap) {
+    int numberOfMessages = 0;
+    for (Collection<M> messages : partitionMap.values()) {
+      numberOfMessages += messages.size();
+    }
+    return numberOfMessages;
+  }
+
+  @Override
+  protected void writeMessages(Collection<M> messages,
+      DataOutput out) throws IOException {
+    out.writeInt(messages.size());
+    for (M message : messages) {
+      message.write(out);
+    }
+  }
+
+  @Override
+  protected Collection<M> readFieldsForMessages(DataInput in) throws
+      IOException {
+    int numMessages = in.readInt();
+    List<M> messages = Lists.newArrayList();
+    for (int m = 0; m < numMessages; m++) {
+      M message = config.createMessageValue();
+      message.readFields(in);
+      messages.add(message);
+    }
+    return messages;
+  }
+
+  /**
+   * Create new factory for this message store
+   *
+   * @param service Worker service
+   * @param config  Hadoop configuration
+   * @param <I>     Vertex id
+   * @param <M>     Message data
+   * @return Factory
+   */
+  public static <I extends WritableComparable, M extends Writable>
+  MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
+      CentralizedServiceWorker<I, ?, ?, M> service,
+      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+    return new Factory<I, M>(service, config);
+  }
+
+  /**
+   * Factory for {@link CollectionOfMessagesPerVertexStore}
+   *
+   * @param <I> Vertex id
+   * @param <M> Message data
+   */
+  private static class Factory<I extends WritableComparable, M extends Writable>
+      implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+    /** Service worker */
+    private final CentralizedServiceWorker<I, ?, ?, M> service;
+    /** Hadoop configuration */
+    private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
+
+    /**
+     * @param service Worker service
+     * @param config  Hadoop configuration
+     */
+    public Factory(CentralizedServiceWorker<I, ?, ?, M> service,
+        ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+      this.service = service;
+      this.config = config;
+    }
+
+    @Override
+    public MessageStoreByPartition<I, M> newStore() {
+      return new CollectionOfMessagesPerVertexStore(service, config);
+    }
+  }
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java Tue Nov 13 18:01:47 2012
@@ -19,7 +19,6 @@
 package org.apache.giraph.comm.messages;
 
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.VertexCombiner;
 import org.apache.giraph.utils.CollectionUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -55,8 +54,6 @@ public class DiskBackedMessageStore<I ex
   private volatile ConcurrentNavigableMap<I, Collection<M>> inMemoryMessages;
   /** Hadoop configuration */
   private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
-  /** Combiner for messages */
-  private final VertexCombiner<I, M> combiner;
   /** Counter for number of messages in memory */
   private final AtomicInteger numberOfMessagesInMemory;
   /** To keep vertex ids which we have messages for */
@@ -70,16 +67,14 @@ public class DiskBackedMessageStore<I ex
   private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
 
   /**
-   * @param combiner         Combiner for messages
    * @param config           Hadoop configuration
    * @param fileStoreFactory Factory for creating file stores when flushing
    */
-  public DiskBackedMessageStore(VertexCombiner<I, M> combiner,
+  public DiskBackedMessageStore(
       ImmutableClassesGiraphConfiguration<I, ?, ?, M> config,
       MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
     inMemoryMessages = new ConcurrentSkipListMap<I, Collection<M>>();
     this.config = config;
-    this.combiner = combiner;
     numberOfMessagesInMemory = new AtomicInteger(0);
     destinationVertices =
         Collections.newSetFromMap(Maps.<I, Boolean>newConcurrentMap());
@@ -94,20 +89,8 @@ public class DiskBackedMessageStore<I ex
 
     rwLock.readLock().lock();
     try {
-      Collection<M> currentMessages =
-          CollectionUtils.addConcurrent(vertexId, messages, inMemoryMessages);
-      if (combiner != null) {
-        synchronized (currentMessages) {
-          numberOfMessagesInMemory.addAndGet(
-              messages.size() - currentMessages.size());
-          currentMessages =
-              Lists.newArrayList(combiner.combine(vertexId, currentMessages));
-          inMemoryMessages.put(vertexId, currentMessages);
-          numberOfMessagesInMemory.addAndGet(currentMessages.size());
-        }
-      } else {
-        numberOfMessagesInMemory.addAndGet(messages.size());
-      }
+      CollectionUtils.addConcurrent(vertexId, messages, inMemoryMessages);
+      numberOfMessagesInMemory.addAndGet(messages.size());
     } finally {
       rwLock.readLock().unlock();
     }
@@ -269,8 +252,6 @@ public class DiskBackedMessageStore<I ex
       FlushableMessageStore<I, M>> {
     /** Hadoop configuration */
     private final ImmutableClassesGiraphConfiguration config;
-    /** Combiner for messages */
-    private final VertexCombiner<I, M> combiner;
     /** Factory for creating message stores for partitions */
     private final
     MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory;
@@ -283,18 +264,12 @@ public class DiskBackedMessageStore<I ex
     public Factory(ImmutableClassesGiraphConfiguration config,
         MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory) {
       this.config = config;
-      if (config.getVertexCombinerClass() == null) {
-        combiner = null;
-      } else {
-        combiner = config.createVertexCombiner();
-      }
       this.fileStoreFactory = fileStoreFactory;
     }
 
     @Override
     public FlushableMessageStore<I, M> newStore() {
-      return new DiskBackedMessageStore<I, M>(combiner, config,
-          fileStoreFactory);
+      return new DiskBackedMessageStore<I, M>(config, fileStoreFactory);
     }
   }
 }

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java?rev=1408863&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java Tue Nov 13 18:01:47 2012
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.graph.Combiner;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Implementation of {@link SimpleMessageStore} where we have a single
+ * message per vertex.
+ * Used when {@link Combiner} is provided.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class OneMessagePerVertexStore<I extends WritableComparable,
+    M extends Writable> extends SimpleMessageStore<I, M, M> {
+  /** Combiner for messages */
+  private final Combiner<I, M> combiner;
+
+  /**
+   * @param service  Service worker
+   * @param combiner Combiner for messages
+   * @param config   Hadoop configuration
+   */
+  OneMessagePerVertexStore(CentralizedServiceWorker<I, ?, ?, M> service,
+      Combiner<I, M> combiner,
+      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+    super(service, config);
+    this.combiner = combiner;
+  }
+
+  /**
+   * If there is already a message related to the vertex id in the
+   * partition map return that message, otherwise create a new one,
+   * put it in the map and return it
+   *
+   * @param vertexId Id of vertex
+   * @param partitionMap Partition map
+   * @return Message for this vertex
+   */
+  private M getOrCreateCurrentMessage(I vertexId,
+      ConcurrentMap<I, M> partitionMap) {
+    M currentMessage = partitionMap.get(vertexId);
+    if (currentMessage == null) {
+      M newMessage = combiner.createInitialMessage();
+      currentMessage = partitionMap.putIfAbsent(vertexId, newMessage);
+      if (currentMessage == null) {
+        currentMessage = newMessage;
+      }
+    }
+    return currentMessage;
+  }
+
+  @Override
+  protected void addVertexMessagesToPartition(I vertexId,
+      Collection<M> messages,
+      ConcurrentMap<I, M> partitionMap) throws IOException {
+    M currentMessage = getOrCreateCurrentMessage(vertexId, partitionMap);
+    synchronized (currentMessage) {
+      for (M message : messages) {
+        combiner.combine(vertexId, currentMessage, message);
+      }
+    }
+  }
+
+  @Override
+  protected void addVertexMessageToPartition(I vertexId, M message,
+      ConcurrentMap<I, M> partitionMap) throws IOException {
+    M currentMessage = getOrCreateCurrentMessage(vertexId, partitionMap);
+    synchronized (currentMessage) {
+      combiner.combine(vertexId, currentMessage, message);
+    }
+  }
+
+  @Override
+  protected Collection<M> getMessagesAsCollection(M message) {
+    return Collections.singleton(message);
+  }
+
+  @Override
+  protected int getNumberOfMessagesIn(ConcurrentMap<I, M> partitionMap) {
+    return partitionMap.size();
+  }
+
+  @Override
+  protected void writeMessages(M messages, DataOutput out) throws IOException {
+    messages.write(out);
+  }
+
+  @Override
+  protected M readFieldsForMessages(DataInput in) throws IOException {
+    M message = config.createMessageValue();
+    message.readFields(in);
+    return message;
+  }
+
+  /**
+   * Create new factory for this message store
+   *
+   * @param service Worker service
+   * @param config  Hadoop configuration
+   * @param <I>     Vertex id
+   * @param <M>     Message data
+   * @return Factory
+   */
+  public static <I extends WritableComparable, M extends Writable>
+  MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
+      CentralizedServiceWorker<I, ?, ?, M> service,
+      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+    return new Factory<I, M>(service, config);
+  }
+
+  /**
+   * Factory for {@link CollectionOfMessagesPerVertexStore}
+   *
+   * @param <I> Vertex id
+   * @param <M> Message data
+   */
+  private static class Factory<I extends WritableComparable,
+      M extends Writable>
+      implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
+    /** Service worker */
+    private final CentralizedServiceWorker<I, ?, ?, M> service;
+    /** Hadoop configuration */
+    private final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
+    /** Combiner for messages */
+    private final Combiner<I, M> combiner;
+
+    /**
+     * @param service Worker service
+     * @param config  Hadoop configuration
+     */
+    public Factory(CentralizedServiceWorker<I, ?, ?, M> service,
+        ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
+      this.service = service;
+      this.config = config;
+      combiner = config.createCombiner();
+    }
+
+    @Override
+    public MessageStoreByPartition<I, M> newStore() {
+      return new OneMessagePerVertexStore<I, M>(service, combiner, config);
+    }
+  }
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java Tue Nov 13 18:01:47 2012
@@ -18,16 +18,14 @@
 
 package org.apache.giraph.comm.messages;
 
-import com.google.common.collect.MapMaker;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.VertexIdMessageCollection;
-import org.apache.giraph.graph.VertexCombiner;
-import org.apache.giraph.utils.CollectionUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
 
 import java.io.DataInput;
@@ -37,125 +35,109 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
-import org.apache.log4j.Logger;
 
 /**
+ * Abstract class for {@link MessageStoreByPartition} which allows any kind
+ * of object to hold messages for one vertex.
  * Simple in memory message store implemented with a two level concurrent
  * hash map.
  *
  * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
  * @param <M> Message data
+ * @param <T> Type of object which holds messages for one vertex
  */
-public class SimpleMessageStore<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    implements MessageStoreByPartition<I, M> {
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(SimpleMessageStore.class);
+public abstract class SimpleMessageStore<I extends WritableComparable,
+    M extends Writable, T> implements MessageStoreByPartition<I, M>  {
   /** Service worker */
-  private final CentralizedServiceWorker<I, V, E, M> service;
-  /**
-   * Internal message map, from partition id to map from vertex id to
-   * messages
-   */
-  private final ConcurrentMap<Integer, ConcurrentMap<I, Collection<M>>> map;
-  /** Hadoop configuration */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> config;
-  /** Combiner for messages */
-  private final VertexCombiner<I, M> combiner;
+  protected final CentralizedServiceWorker<I, ?, ?, M> service;
+  /** Map from partition id to map from vertex id to messages for that vertex */
+  protected final ConcurrentMap<Integer, ConcurrentMap<I, T>> map;
+  /** Giraph configuration */
+  protected final ImmutableClassesGiraphConfiguration<I, ?, ?, M> config;
 
   /**
-   * @param service  Service worker
-   * @param combiner Combiner for messages
-   * @param config   Hadoop configuration
+   * Constructor
+   *
+   * @param service Service worker
+   * @param config Giraph configuration
    */
-  SimpleMessageStore(CentralizedServiceWorker<I, V, E, M> service,
-      VertexCombiner<I, M> combiner,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> config) {
+  public SimpleMessageStore(
+      CentralizedServiceWorker<I, ?, ?, M> service,
+      ImmutableClassesGiraphConfiguration<I, ?, ?, M> config) {
     this.service = service;
+    this.config = config;
     map = new MapMaker().concurrencyLevel(
         config.getNettyServerExecutionConcurrency()).makeMap();
-    this.combiner = combiner;
-    this.config = config;
   }
 
-  @Override
-  public void addVertexMessages(I vertexId,
-      Collection<M> messages) throws IOException {
-    int partitionId = getPartitionId(vertexId);
-    ConcurrentMap<I, Collection<M>> partitionMap =
-        getOrCreatePartitionMap(partitionId);
-    Collection<M> currentMessages =
-      CollectionUtils.addConcurrent(vertexId, messages, partitionMap);
-    if (combiner != null) {
-      synchronized (currentMessages) {
-        currentMessages =
-          Lists.newArrayList(combiner.combine(vertexId, currentMessages));
-        partitionMap.put(vertexId, currentMessages);
-      }
-    }
-  }
+  /**
+   * Add collection of messages for vertex to a partition map
+   *
+   * @param vertexId Id of vertex which received messages
+   * @param messages Messages to add
+   * @param partitionMap Map which to add to
+   * @throws IOException
+   */
+  protected abstract void addVertexMessagesToPartition(I vertexId,
+      Collection<M> messages, ConcurrentMap<I, T> partitionMap) throws
+      IOException;
 
-  @Override
-  public void addMessages(Map<I, Collection<M>> messages) throws IOException {
-    for (Entry<I, Collection<M>> entry : messages.entrySet()) {
-      addVertexMessages(entry.getKey(), entry.getValue());
-    }
-  }
+  /**
+   * Add a single message for vertex to a partition map
+   *
+   * @param vertexId Id of vertex which received message
+   * @param message Message to add
+   * @param partitionMap Map which to add to
+   * @throws IOException
+   */
+  protected abstract void addVertexMessageToPartition(I vertexId,
+      M message, ConcurrentMap<I, T> partitionMap) throws IOException;
 
-  @Override
-  public void addPartitionMessages(Map<I, Collection<M>> messages,
-      int partitionId) throws IOException {
-    ConcurrentMap<I, Collection<M>> partitionMap =
-        getOrCreatePartitionMap(partitionId);
+  /**
+   * Get messages as collection from message storage
+   *
+   * @param messages Message storage
+   * @return Messages as collection
+   */
+  protected abstract Collection<M> getMessagesAsCollection(T messages);
 
-    for (Entry<I, Collection<M>> entry : messages.entrySet()) {
-      Collection<M> currentMessages =
-          CollectionUtils.addConcurrent(
-              entry.getKey(), entry.getValue(), partitionMap);
-      if (combiner != null) {
-        synchronized (currentMessages) {
-          currentMessages =
-              Lists.newArrayList(combiner.combine(entry.getKey(),
-                  currentMessages));
-          partitionMap.put(entry.getKey(), currentMessages);
-        }
-      }
-    }
-  }
+  /**
+   * Get number of messages in partition map
+   *
+   * @param partitionMap Partition map in which to count messages
+   * @return Number of messages in partition map
+   */
+  protected abstract int getNumberOfMessagesIn(
+      ConcurrentMap<I, T> partitionMap);
 
-  @Override
-  public void addPartitionMessages(VertexIdMessageCollection<I, M> messages,
-      int partitionId) throws IOException {
-    ConcurrentMap<I, Collection<M>> partitionMap =
-        getOrCreatePartitionMap(partitionId);
+  /**
+   * Write message storage to {@link DataOutput}
+   *
+   * @param messages Message storage
+   * @param out Data output
+   * @throws IOException
+   */
+  protected abstract void writeMessages(T messages, DataOutput out) throws
+      IOException;
 
-    VertexIdMessageCollection<I, M>.Iterator iterator = messages.getIterator();
-    while (iterator.hasNext()) {
-      iterator.next();
-      I vertexId = iterator.getCurrentFirst();
-      M message = iterator.getCurrentSecond();
-      Collection<M> currentMessages = partitionMap.get(vertexId);
-      if (currentMessages == null) {
-        Collection<M> newMessages = Lists.newArrayList(message);
-        currentMessages = partitionMap.putIfAbsent(vertexId, newMessages);
-      }
-      // if vertex messages existed before, or putIfAbsent didn't put new list
-      if (currentMessages != null) {
-        synchronized (currentMessages) {
-          currentMessages.add(message);
-          if (combiner != null) {
-            currentMessages =
-                Lists.newArrayList(combiner.combine(vertexId,
-                    currentMessages));
-            partitionMap.put(vertexId, currentMessages);
-          }
-        }
-      }
-    }
+  /**
+   * Read message storage from {@link DataInput}
+   *
+   * @param in Data input
+   * @return Message storage
+   * @throws IOException
+   */
+  protected abstract T readFieldsForMessages(DataInput in) throws IOException;
+
+  /**
+   * Get id of partition which holds vertex with selected id
+   *
+   * @param vertexId Id of vertex
+   * @return Id of partiton
+   */
+  protected int getPartitionId(I vertexId) {
+    return service.getVertexPartitionOwner(vertexId).getPartitionId();
   }
 
   /**
@@ -166,109 +148,114 @@ public class SimpleMessageStore<I extend
    * @param partitionId Id of partition
    * @return Message map for this partition
    */
-  private ConcurrentMap<I, Collection<M>> getOrCreatePartitionMap(
-      int partitionId) {
-    ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
+  protected ConcurrentMap<I, T> getOrCreatePartitionMap(int partitionId) {
+    ConcurrentMap<I, T> partitionMap = map.get(partitionId);
     if (partitionMap == null) {
-      ConcurrentMap<I, Collection<M>> tmpMap =
-          new MapMaker().concurrencyLevel(
-              config.getNettyServerExecutionConcurrency()).
-              makeMap();
+      ConcurrentMap<I, T> tmpMap = new MapMaker().concurrencyLevel(
+          config.getNettyServerExecutionConcurrency()).makeMap();
       partitionMap = map.putIfAbsent(partitionId, tmpMap);
       if (partitionMap == null) {
-        partitionMap = map.get(partitionId);
+        partitionMap = tmpMap;
       }
     }
     return partitionMap;
   }
 
   @Override
-  public Collection<M> getVertexMessages(I vertexId) throws IOException {
-    ConcurrentMap<I, Collection<M>> partitionMap =
-        map.get(getPartitionId(vertexId));
-    if (partitionMap == null) {
-      return Collections.<M>emptyList();
+  public void addMessages(Map<I, Collection<M>> messages) throws IOException {
+    for (Map.Entry<I, Collection<M>> entry : messages.entrySet()) {
+      addVertexMessages(entry.getKey(), entry.getValue());
     }
-    Collection<M> messages = partitionMap.get(vertexId);
-    return (messages == null) ? Collections.<M>emptyList() : messages;
   }
 
   @Override
-  public int getNumberOfMessages() {
-    int numberOfMessages = 0;
-    for (ConcurrentMap<I, Collection<M>> partitionMap : map.values()) {
-      for (Collection<M> messages : partitionMap.values()) {
-        numberOfMessages += messages.size();
-      }
-    }
-    return numberOfMessages;
+  public void addVertexMessages(I vertexId,
+      Collection<M> messages) throws IOException {
+    int partitionId = getPartitionId(vertexId);
+    ConcurrentMap<I, T> partitionMap = getOrCreatePartitionMap(partitionId);
+    addVertexMessagesToPartition(vertexId, messages, partitionMap);
   }
 
   @Override
-  public boolean hasMessagesForVertex(I vertexId) {
-    ConcurrentMap<I, Collection<M>> partitionMap =
-        map.get(getPartitionId(vertexId));
-    return (partitionMap == null) ? false : partitionMap.containsKey(vertexId);
+  public void addPartitionMessages(Map<I, Collection<M>> messages,
+      int partitionId) throws IOException {
+    ConcurrentMap<I, T> partitionMap =
+        getOrCreatePartitionMap(partitionId);
+
+    for (Map.Entry<I, Collection<M>> entry : messages.entrySet()) {
+      addVertexMessagesToPartition(entry.getKey(), entry.getValue(),
+          partitionMap);
+    }
   }
 
   @Override
-  public Iterable<I> getDestinationVertices() {
-    List<I> vertices = Lists.newArrayList();
-    for (ConcurrentMap<I, Collection<M>> partitionMap : map.values()) {
-      vertices.addAll(partitionMap.keySet());
+  public void addPartitionMessages(VertexIdMessageCollection<I, M> messages,
+      int partitionId) throws IOException {
+    ConcurrentMap<I, T> partitionMap =
+        getOrCreatePartitionMap(partitionId);
+
+    VertexIdMessageCollection<I, M>.Iterator iterator = messages.getIterator();
+    while (iterator.hasNext()) {
+      iterator.next();
+      I vertexId = iterator.getCurrentFirst();
+      M message = iterator.getCurrentSecond();
+      addVertexMessageToPartition(vertexId, message, partitionMap);
     }
-    return vertices;
   }
 
   @Override
   public Iterable<I> getPartitionDestinationVertices(int partitionId) {
-    ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
+    ConcurrentMap<I, ?> partitionMap = map.get(partitionId);
     return (partitionMap == null) ? Collections.<I>emptyList() :
         partitionMap.keySet();
   }
 
   @Override
-  public void clearVertexMessages(I vertexId) throws IOException {
-    ConcurrentMap<I, Collection<M>> partitionMap =
+  public boolean hasMessagesForVertex(I vertexId) {
+    ConcurrentMap<I, ?> partitionMap =
         map.get(getPartitionId(vertexId));
-    if (partitionMap != null) {
-      partitionMap.remove(vertexId);
-    }
+    return (partitionMap == null) ? false : partitionMap.containsKey(vertexId);
   }
 
   @Override
-  public void clearPartition(int partitionId) throws IOException {
-    map.remove(partitionId);
+  public Iterable<I> getDestinationVertices() {
+    List<I> vertices = Lists.newArrayList();
+    for (ConcurrentMap<I, ?> partitionMap : map.values()) {
+      vertices.addAll(partitionMap.keySet());
+    }
+    return vertices;
   }
 
   @Override
-  public void clearAll() throws IOException {
-    map.clear();
+  public Collection<M> getVertexMessages(I vertexId) throws IOException {
+    ConcurrentMap<I, T> partitionMap = map.get(getPartitionId(vertexId));
+    if (partitionMap == null) {
+      return Collections.<M>emptyList();
+    }
+    T messages = partitionMap.get(vertexId);
+    return (messages == null) ? Collections.<M>emptyList() :
+        getMessagesAsCollection(messages);
   }
 
-  /**
-   * Get id of partition which holds vertex with selected id
-   *
-   * @param vertexId Id of vertex
-   * @return Id of partiton
-   */
-  private int getPartitionId(I vertexId) {
-    return service.getVertexPartitionOwner(vertexId).getPartitionId();
+  @Override
+  public int getNumberOfMessages() {
+    int numberOfMessages = 0;
+    for (ConcurrentMap<I, T> partitionMap : map.values()) {
+      numberOfMessages += getNumberOfMessagesIn(partitionMap);
+    }
+    return numberOfMessages;
   }
 
   @Override
   public void writePartition(DataOutput out,
       int partitionId) throws IOException {
-    ConcurrentMap<I, Collection<M>> partitionMap = map.get(partitionId);
+    ConcurrentMap<I, T> partitionMap = map.get(partitionId);
     out.writeBoolean(partitionMap != null);
     if (partitionMap != null) {
       out.writeInt(partitionMap.size());
-      for (Entry<I, Collection<M>> entry : partitionMap.entrySet()) {
+      for (Map.Entry<I, T> entry : partitionMap.entrySet()) {
         entry.getKey().write(out);
-        out.writeInt(entry.getValue().size());
-        for (M message : entry.getValue()) {
-          message.write(out);
-        }
+        writeMessages(entry.getValue(), out);
       }
     }
   }
@@ -286,19 +273,12 @@ public class SimpleMessageStore<I extend
   public void readFieldsForPartition(DataInput in,
       int partitionId) throws IOException {
     if (in.readBoolean()) {
-      ConcurrentMap<I, Collection<M>> partitionMap = Maps.newConcurrentMap();
+      ConcurrentMap<I, T> partitionMap = Maps.newConcurrentMap();
       int numVertices = in.readInt();
       for (int v = 0; v < numVertices; v++) {
         I vertexId = config.createVertexId();
         vertexId.readFields(in);
-        int numMessages = in.readInt();
-        List<M> messages = Lists.newArrayList();
-        for (int m = 0; m < numMessages; m++) {
-          M message = config.createMessageValue();
-          message.readFields(in);
-          messages.add(message);
-        }
-        partitionMap.put(vertexId, messages);
+        partitionMap.put(vertexId, readFieldsForMessages(in));
       }
       map.put(partitionId, partitionMap);
     }
@@ -313,62 +293,22 @@ public class SimpleMessageStore<I extend
     }
   }
 
-
-  /**
-   * Create new factory for this message store
-   *
-   * @param service Worker service
-   * @param config  Hadoop configuration
-   * @param <I>     Vertex id
-   * @param <V>     Vertex data
-   * @param <E>     Edge data
-   * @param <M>     Message data
-   * @return Factory
-   */
-  public static <I extends WritableComparable, V extends Writable,
-      E extends Writable, M extends Writable>
-  MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> newFactory(
-      CentralizedServiceWorker<I, V, E, M> service,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> config) {
-    return new Factory<I, V, E, M>(service, config);
+  @Override
+  public void clearVertexMessages(I vertexId) throws IOException {
+    ConcurrentMap<I, ?> partitionMap =
+        map.get(getPartitionId(vertexId));
+    if (partitionMap != null) {
+      partitionMap.remove(vertexId);
+    }
   }
 
-  /**
-   * Factory for {@link SimpleMessageStore}
-   *
-   * @param <I> Vertex id
-   * @param <V> Vertex data
-   * @param <E> Edge data
-   * @param <M> Message data
-   */
-  private static class Factory<I extends WritableComparable,
-      V extends Writable, E extends Writable, M extends Writable>
-      implements MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> {
-    /** Service worker */
-    private final CentralizedServiceWorker<I, V, E, M> service;
-    /** Hadoop configuration */
-    private final ImmutableClassesGiraphConfiguration<I, V, E, M> config;
-    /** Combiner for messages */
-    private final VertexCombiner<I, M> combiner;
-
-    /**
-     * @param service Worker service
-     * @param config  Hadoop configuration
-     */
-    public Factory(CentralizedServiceWorker<I, V, E, M> service,
-        ImmutableClassesGiraphConfiguration<I, V, E, M> config) {
-      this.service = service;
-      this.config = config;
-      if (config.getVertexCombinerClass() == null) {
-        combiner = null;
-      } else {
-        combiner = config.createVertexCombiner();
-      }
-    }
+  @Override
+  public void clearPartition(int partitionId) throws IOException {
+    map.remove(partitionId);
+  }
 
-    @Override
-    public MessageStoreByPartition<I, M> newStore() {
-      return new SimpleMessageStore(service, combiner, config);
-    }
+  @Override
+  public void clearAll() throws IOException {
+    map.clear();
   }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java Tue Nov 13 18:01:47 2012
@@ -22,6 +22,8 @@ import org.apache.giraph.GiraphConfigura
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.messages.CollectionOfMessagesPerVertexStore;
+import org.apache.giraph.comm.messages.OneMessagePerVertexStore;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.comm.WorkerServer;
 import org.apache.giraph.comm.messages.BasicMessageStore;
@@ -31,7 +33,6 @@ import org.apache.giraph.comm.messages.F
 import org.apache.giraph.comm.messages.MessageStoreByPartition;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.comm.messages.SequentialFileMessageStore;
-import org.apache.giraph.comm.messages.SimpleMessageStore;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexMutations;
@@ -78,36 +79,62 @@ public class NettyWorkerServer<I extends
    * @param service Service to get partition mappings
    * @param context Mapper context
    */
-  public NettyWorkerServer(ImmutableClassesGiraphConfiguration conf,
+  public NettyWorkerServer(ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
       CentralizedServiceWorker<I, V, E, M> service,
       Mapper<?, ?, ?, ?>.Context context) {
     this.conf = conf;
     this.service = service;
 
+    serverData =
+        new ServerData<I, V, E, M>(conf, createMessageStoreFactory(), context);
+
+    nettyServer = new NettyServer(conf,
+        new WorkerRequestServerHandler.Factory<I, V, E, M>(serverData));
+    nettyServer.start();
+  }
+
+  /**
+   * Decide which message store should be used for current application,
+   * and create the factory for that store
+   *
+   * @return Message store factory
+   */
+  private MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
+  createMessageStoreFactory() {
     boolean useOutOfCoreMessaging = conf.getBoolean(
         GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES,
         GiraphConfiguration.USE_OUT_OF_CORE_MESSAGES_DEFAULT);
     if (!useOutOfCoreMessaging) {
-      serverData = new ServerData<I, V, E, M>(
-          conf, SimpleMessageStore.newFactory(service, conf), context);
+      if (conf.useCombiner()) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("createMessageStoreFactory: " +
+              "Using OneMessagePerVertexStore since combiner enabled");
+        }
+        return OneMessagePerVertexStore.newFactory(service, conf);
+      } else {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("createMessageStoreFactory: " +
+              "Using CollectionOfMessagesPerVertexStore " +
+              "since there is no combiner");
+        }
+        return CollectionOfMessagesPerVertexStore.newFactory(service, conf);
+      }
     } else {
       int maxMessagesInMemory = conf.getInt(
           GiraphConfiguration.MAX_MESSAGES_IN_MEMORY,
           GiraphConfiguration.MAX_MESSAGES_IN_MEMORY_DEFAULT);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("createMessageStoreFactory: Using DiskBackedMessageStore, " +
+            "maxMessagesInMemory = " + maxMessagesInMemory);
+      }
       MessageStoreFactory<I, M, BasicMessageStore<I, M>> fileStoreFactory =
           SequentialFileMessageStore.newFactory(conf);
       MessageStoreFactory<I, M, FlushableMessageStore<I, M>>
           partitionStoreFactory =
           DiskBackedMessageStore.newFactory(conf, fileStoreFactory);
-      MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
-          storeFactory = DiskBackedMessageStoreByPartition.newFactory(service,
-              maxMessagesInMemory, partitionStoreFactory);
-      serverData = new ServerData<I, V, E, M>(conf, storeFactory, context);
+      return DiskBackedMessageStoreByPartition.newFactory(service,
+          maxMessagesInMemory, partitionStoreFactory);
     }
-
-    nettyServer = new NettyServer(conf,
-        new WorkerRequestServerHandler.Factory<I, V, E, M>(serverData));
-    nettyServer.start();
   }
 
   @Override

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/DoubleSumCombiner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/DoubleSumCombiner.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/DoubleSumCombiner.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/DoubleSumCombiner.java Tue Nov 13 18:01:47 2012
@@ -18,11 +18,7 @@
 
 package org.apache.giraph.examples;
 
-import java.io.IOException;
-import java.util.Collections;
-
-import org.apache.giraph.graph.VertexCombiner;
-import org.apache.giraph.utils.MathUtils;
+import org.apache.giraph.graph.Combiner;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 
@@ -30,10 +26,15 @@ import org.apache.hadoop.io.LongWritable
  * A combiner that sums double-valued messages
  */
 public class DoubleSumCombiner extends
-    VertexCombiner<LongWritable, DoubleWritable> {
+    Combiner<LongWritable, DoubleWritable> {
+  @Override
+  public void combine(LongWritable vertexIndex, DoubleWritable originalMessage,
+      DoubleWritable messageToCombine) {
+    originalMessage.set(originalMessage.get() + messageToCombine.get());
+  }
+
   @Override
-  public Iterable<DoubleWritable> combine(LongWritable vertexIndex,
-      Iterable<DoubleWritable> messages) throws IOException {
-    return Collections.singleton(new DoubleWritable(MathUtils.sum(messages)));
+  public DoubleWritable createInitialMessage() {
+    return new DoubleWritable(0);
   }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/MinimumDoubleCombiner.java Tue Nov 13 18:01:47 2012
@@ -18,32 +18,25 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.graph.Combiner;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 /**
- * {@link VertexCombiner} that finds the minimum {@link DoubleWritable}
+ * Combiner which finds the minimum of {@link DoubleWritable}.
  */
 public class MinimumDoubleCombiner extends
-    VertexCombiner<LongWritable, DoubleWritable> {
+    Combiner<LongWritable, DoubleWritable> {
   @Override
-  public Iterable<DoubleWritable> combine(
-      LongWritable target,
-      Iterable<DoubleWritable> messages) throws IOException {
-    double minimum = Double.MAX_VALUE;
-    for (DoubleWritable message : messages) {
-      if (message.get() < minimum) {
-        minimum = message.get();
-      }
+  public void combine(LongWritable vertexIndex, DoubleWritable originalMessage,
+      DoubleWritable messageToCombine) {
+    if (originalMessage.get() > messageToCombine.get()) {
+      originalMessage.set(messageToCombine.get());
     }
-    List<DoubleWritable> value = new ArrayList<DoubleWritable>();
-    value.add(new DoubleWritable(minimum));
+  }
 
-    return value;
+  @Override
+  public DoubleWritable createInitialMessage() {
+    return new DoubleWritable(Double.MAX_VALUE);
   }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java Tue Nov 13 18:01:47 2012
@@ -18,30 +18,24 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.graph.Combiner;
 import org.apache.hadoop.io.IntWritable;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 /**
- * {@link VertexCombiner} that finds the minimum {@link IntWritable}
+ * {@link Combiner} that finds the minimum {@link IntWritable}
  */
 public class MinimumIntCombiner
-    extends VertexCombiner<IntWritable, IntWritable> {
+    extends Combiner<IntWritable, IntWritable> {
   @Override
-  public Iterable<IntWritable> combine(IntWritable target,
-      Iterable<IntWritable> messages) throws IOException {
-    int minimum = Integer.MAX_VALUE;
-    for (IntWritable message : messages) {
-      if (message.get() < minimum) {
-        minimum = message.get();
-      }
+  public void combine(IntWritable vertexIndex, IntWritable originalMessage,
+      IntWritable messageToCombine) {
+    if (originalMessage.get() > messageToCombine.get()) {
+      originalMessage.set(messageToCombine.get());
     }
-    List<IntWritable> value = new ArrayList<IntWritable>();
-    value.add(new IntWritable(minimum));
+  }
 
-    return value;
+  @Override
+  public IntWritable createInitialMessage() {
+    return new IntWritable(Integer.MAX_VALUE);
   }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/SimpleSumCombiner.java Tue Nov 13 18:01:47 2012
@@ -18,31 +18,24 @@
 
 package org.apache.giraph.examples;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
+import org.apache.giraph.graph.Combiner;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 
-import org.apache.giraph.graph.VertexCombiner;
-
 /**
- * Test whether combiner is called by summing up the messages.
+ * Combiner which sums up {@link IntWritable} message values.
  */
 public class SimpleSumCombiner
-    extends VertexCombiner<LongWritable, IntWritable> {
+    extends Combiner<LongWritable, IntWritable> {
 
   @Override
-  public Iterable<IntWritable> combine(LongWritable vertexIndex,
-      Iterable<IntWritable> messages) throws IOException {
-    int sum = 0;
-    for (IntWritable msg : messages) {
-      sum += msg.get();
-    }
-    List<IntWritable> value = new ArrayList<IntWritable>();
-    value.add(new IntWritable(sum));
+  public void combine(LongWritable vertexIndex, IntWritable originalMessage,
+      IntWritable messageToCombine) {
+    originalMessage.set(originalMessage.get() + messageToCombine.get());
+  }
 
-    return value;
+  @Override
+  public IntWritable createInitialMessage() {
+    return new IntWritable(0);
   }
 }

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspUtils.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspUtils.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspUtils.java Tue Nov 13 18:01:47 2012
@@ -246,7 +246,7 @@ public class BspUtils {
   }
 
   /**
-   * Get the user's subclassed {@link VertexCombiner}.
+   * Get the user's subclassed {@link Combiner}.
    *
    * @param <I> Vertex id
    * @param <M> Message data
@@ -255,28 +255,11 @@ public class BspUtils {
    */
   @SuppressWarnings({ "rawtypes", "unchecked" })
   public static <I extends WritableComparable, M extends Writable>
-  Class<? extends VertexCombiner<I, M>>
-  getVertexCombinerClass(Configuration conf) {
-    return (Class<? extends VertexCombiner<I, M>>)
+  Class<? extends Combiner<I, M>> getCombinerClass(Configuration conf) {
+    return (Class<? extends Combiner<I, M>>)
       conf.getClass(GiraphConfiguration.VERTEX_COMBINER_CLASS,
         null,
-        VertexCombiner.class);
-  }
-
-  /**
-   * Create a user vertex combiner class
-   *
-   * @param <I> Vertex id
-   * @param <M> Message data
-   * @param conf Configuration to check
-   * @return Instantiated user vertex combiner class
-   */
-  @SuppressWarnings("rawtypes")
-  public static <I extends WritableComparable, M extends Writable>
-  VertexCombiner<I, M> createVertexCombiner(Configuration conf) {
-    Class<? extends VertexCombiner<I, M>> vertexCombinerClass =
-      getVertexCombinerClass(conf);
-    return ReflectionUtils.newInstance(vertexCombinerClass, conf);
+        Combiner.class);
   }
 
   /**

Added: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Combiner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Combiner.java?rev=1408863&view=auto
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Combiner.java (added)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/Combiner.java Tue Nov 13 18:01:47 2012
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Abstract class to extend for combining messages sent to the same vertex.
+ * Combiner for applications where each two messages for one vertex can be
+ * combined into one.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public abstract class Combiner<I extends WritableComparable,
+    M extends Writable> {
+  /**
+   * Combine messageToCombine with originalMassage,
+   * by modifying originalMessage.
+   *
+   * @param vertexIndex Index of the vertex getting these messages
+   * @param originalMessage The first message which we want to combine;
+   *                        put the result of combining in this message
+   * @param messageToCombine The second message which we want to combine
+   */
+  public abstract void combine(I vertexIndex, M originalMessage,
+      M messageToCombine);
+
+  /**
+   * Get the initial message. When combined with any other message M,
+   * the result should be M.
+   *
+   * @return Initial message
+   */
+  public abstract M createInitialMessage();
+}

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java Tue Nov 13 18:01:47 2012
@@ -138,12 +138,12 @@ public class GiraphTypeValidator<I exten
 
   /** If there is a combiner type, verify its generic params match the job. */
   private void verifyVertexCombinerGenericTypes() {
-    Class<? extends VertexCombiner<I, M>> vertexCombinerClass =
-      BspUtils.<I, M>getVertexCombinerClass(conf);
+    Class<? extends Combiner<I, M>> vertexCombinerClass =
+      BspUtils.<I, M>getCombinerClass(conf);
     if (vertexCombinerClass != null) {
       List<Class<?>> classList =
-        ReflectionUtils.<VertexCombiner>getTypeArguments(
-          VertexCombiner.class, vertexCombinerClass);
+        ReflectionUtils.<Combiner>getTypeArguments(
+          Combiner.class, vertexCombinerClass);
       if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
         throw new IllegalArgumentException(
           "checkClassTypes: Vertex index types don't match, " +

Modified: giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java (original)
+++ giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java Tue Nov 13 18:01:47 2012
@@ -19,11 +19,11 @@
 package org.apache.giraph.utils;
 
 import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.graph.Combiner;
 import org.apache.giraph.graph.EdgeInputFormat;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.MasterCompute;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexCombiner;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexOutputFormat;
 import org.apache.giraph.graph.WorkerContext;
@@ -105,7 +105,7 @@ public class InternalVertexRunner {
   @SuppressWarnings("rawtypes")
   public static Iterable<String> run(
       Class<? extends Vertex> vertexClass,
-      Class<? extends VertexCombiner> vertexCombinerClass,
+      Class<? extends Combiner> vertexCombinerClass,
       Class<? extends VertexInputFormat> vertexInputFormatClass,
       Class<? extends VertexOutputFormat> vertexOutputFormatClass,
       Map<String, String> params,
@@ -133,7 +133,7 @@ public class InternalVertexRunner {
   @SuppressWarnings("rawtypes")
   public static Iterable<String> run(
       Class<? extends Vertex> vertexClass,
-      Class<? extends VertexCombiner> vertexCombinerClass,
+      Class<? extends Combiner> vertexCombinerClass,
       Class<? extends VertexInputFormat> vertexInputFormatClass,
       Class<? extends VertexOutputFormat> vertexOutputFormatClass,
       Class<? extends WorkerContext> workerContextClass,
@@ -165,7 +165,7 @@ public class InternalVertexRunner {
    */
   @SuppressWarnings("rawtypes")
   public static Iterable<String> run(Class<? extends Vertex> vertexClass,
-      Class<? extends VertexCombiner> vertexCombinerClass,
+      Class<? extends Combiner> vertexCombinerClass,
       Class<? extends VertexInputFormat> vertexInputFormatClass,
       Class<? extends EdgeInputFormat> edgeInputFormatClass,
       Class<? extends VertexOutputFormat> vertexOutputFormatClass,

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/TestVertexTypes.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/TestVertexTypes.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/TestVertexTypes.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/TestVertexTypes.java Tue Nov 13 18:01:47 2012
@@ -21,10 +21,9 @@ package org.apache.giraph;
 import org.apache.giraph.io.GeneratedVertexInputFormat;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.graph.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.GiraphTypeValidator;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.graph.Combiner;
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexOutputFormat;
 import org.apache.giraph.io.JsonBase64VertexInputFormat;
@@ -36,7 +35,7 @@ import org.apache.hadoop.io.FloatWritabl
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.junit.Test;
-import org.junit.Assert;
+
 import java.io.IOException;
 
 
@@ -76,25 +75,34 @@ public class TestVertexTypes {
      * Matches the {@link GeneratedVertexMatch}
      */
     private static class GeneratedVertexMatchCombiner extends
-            VertexCombiner<LongWritable, FloatWritable> {
-        @Override
-        public Iterable<FloatWritable> combine(LongWritable vertexIndex,
-                Iterable<FloatWritable> msgList) throws IOException {
-            return new EmptyIterable<FloatWritable>();
-        }
+        Combiner<LongWritable, FloatWritable> {
+      @Override
+      public void combine(LongWritable vertexIndex,
+          FloatWritable originalMessage,
+          FloatWritable messageToCombine) {
+      }
+
+      @Override
+      public FloatWritable createInitialMessage() {
+        return null;
+      }
     }
 
     /**
      * Mismatches the {@link GeneratedVertexMatch}
      */
     private static class GeneratedVertexMismatchCombiner extends
-            VertexCombiner<LongWritable, DoubleWritable> {
-        @Override
-        public Iterable<DoubleWritable> combine(LongWritable vertexIndex,
-                Iterable<DoubleWritable> msgList)
-                throws IOException {
-            return new EmptyIterable<DoubleWritable>();
-        }
+        Combiner<LongWritable, DoubleWritable> {
+      @Override
+      public void combine(LongWritable vertexIndex,
+          DoubleWritable originalMessage,
+          DoubleWritable messageToCombine) {
+      }
+
+      @Override
+      public DoubleWritable createInitialMessage() {
+        return null;
+      }
     }
 
     @Test
@@ -109,7 +117,7 @@ public class TestVertexTypes {
                       VertexInputFormat.class);
         conf.setClass(GiraphConfiguration.VERTEX_COMBINER_CLASS,
                       GeneratedVertexMatchCombiner.class,
-                      VertexCombiner.class);
+                      Combiner.class);
       @SuppressWarnings("rawtypes")
       GiraphTypeValidator<?, ?, ?, ?> validator =
         new GiraphTypeValidator(conf);
@@ -175,7 +183,7 @@ public class TestVertexTypes {
         VertexInputFormat.class);
       conf.setClass(GiraphConfiguration.VERTEX_COMBINER_CLASS,
         GeneratedVertexMismatchCombiner.class,
-        VertexCombiner.class);
+        Combiner.class);
       @SuppressWarnings("rawtypes")
       GiraphTypeValidator<?, ?, ?, ?> validator =
         new GiraphTypeValidator(conf);

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/ConnectionTest.java Tue Nov 13 18:01:47 2012
@@ -23,7 +23,6 @@ import com.google.common.collect.Lists;
 import java.util.List;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.comm.messages.SimpleMessageStore;
 import org.apache.giraph.comm.netty.handler.RequestServerHandler;
 import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.giraph.comm.netty.NettyServer;
@@ -74,11 +73,7 @@ public class ConnectionTest {
     when(context.getConfiguration()).thenReturn(conf);
 
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
-            conf,
-            SimpleMessageStore.newFactory(
-                MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
-            context);
+        MockUtils.createNewServerData(conf, context);
     NettyServer server =
         new NettyServer(conf,
             new WorkerRequestServerHandler.Factory(serverData));
@@ -105,11 +100,7 @@ public class ConnectionTest {
     when(context.getConfiguration()).thenReturn(conf);
 
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
-            conf,
-            SimpleMessageStore.newFactory(
-                MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
-            context);
+        MockUtils.createNewServerData(conf, context);
    RequestServerHandler.Factory requestServerHandlerFactory =
        new WorkerRequestServerHandler.Factory(serverData);
 
@@ -146,11 +137,7 @@ public class ConnectionTest {
     when(context.getConfiguration()).thenReturn(conf);
 
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
-            conf,
-            SimpleMessageStore.newFactory(
-                MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
-            context);
+        MockUtils.createNewServerData(conf, context);
     NettyServer server = new NettyServer(conf,
         new WorkerRequestServerHandler.Factory(serverData));
     server.start();

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestFailureTest.java Tue Nov 13 18:01:47 2012
@@ -20,7 +20,6 @@ package org.apache.giraph.comm;
 
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.comm.messages.SimpleMessageStore;
 import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.giraph.comm.netty.NettyServer;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
@@ -130,12 +129,7 @@ public class RequestFailureTest {
   @Test
   public void send2Requests() throws IOException {
     // Start the service
-    serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
-            conf,
-            SimpleMessageStore.newFactory(
-                MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
-            context);
+    serverData = MockUtils.createNewServerData(conf, context);
     server = new NettyServer(conf,
         new WorkerRequestServerHandler.Factory(serverData));
     server.start();
@@ -169,11 +163,7 @@ public class RequestFailureTest {
     conf.setInt(GiraphConfiguration.WAITING_REQUEST_MSECS, 2000);
 
     // Start the service
-    serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
-            (conf, SimpleMessageStore.newFactory(
-                MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
-                context);
+    serverData = MockUtils.createNewServerData(conf, context);
     server = new NettyServer(conf,
         new WorkerRequestServerHandler.Factory(serverData));
     server.start();
@@ -207,10 +197,7 @@ public class RequestFailureTest {
     conf.setInt(GiraphConfiguration.WAITING_REQUEST_MSECS, 2000);
 
     // Start the service
-    serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
-            (conf, SimpleMessageStore.newFactory(
-                MockUtils.mockServiceGetVertexPartitionOwner(1), conf), context);
+    serverData = MockUtils.createNewServerData(conf, context);
     server = new NettyServer(conf,
         new WorkerRequestServerHandler.Factory(serverData));
     server.start();

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/RequestTest.java Tue Nov 13 18:01:47 2012
@@ -20,7 +20,6 @@ package org.apache.giraph.comm;
 
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.comm.messages.SimpleMessageStore;
 import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.giraph.comm.netty.NettyServer;
 import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
@@ -92,12 +91,7 @@ public class RequestTest {
     when(context.getConfiguration()).thenReturn(conf);
 
     // Start the service
-    serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
-            conf,
-            SimpleMessageStore.newFactory(
-                MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
-            context);
+    serverData = MockUtils.createNewServerData(conf, context);
     server = new NettyServer(conf,
         new WorkerRequestServerHandler.Factory(serverData));
     server.start();

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java Tue Nov 13 18:01:47 2012
@@ -21,7 +21,6 @@ package org.apache.giraph.comm;
 import com.google.common.collect.Lists;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.comm.messages.SimpleMessageStore;
 import org.apache.giraph.comm.netty.NettyClient;
 import org.apache.giraph.comm.netty.NettyServer;
 import org.apache.giraph.comm.netty.handler.SaslServerHandler;
@@ -74,11 +73,7 @@ public class SaslConnectionTest {
     when(context.getConfiguration()).thenReturn(conf);
 
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
-        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
-            conf,
-            SimpleMessageStore.newFactory(
-                MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
-            context);
+        MockUtils.createNewServerData(conf, context);
 
     SaslServerHandler.Factory mockedSaslServerFactory =
         Mockito.mock(SaslServerHandler.Factory.class);

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/TestMessageStores.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/TestMessageStores.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/TestMessageStores.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/comm/TestMessageStores.java Tue Nov 13 18:01:47 2012
@@ -23,15 +23,14 @@ import org.apache.giraph.GiraphConfigura
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.messages.BasicMessageStore;
+import org.apache.giraph.comm.messages.CollectionOfMessagesPerVertexStore;
 import org.apache.giraph.comm.messages.DiskBackedMessageStoreByPartition;
 import org.apache.giraph.comm.messages.DiskBackedMessageStore;
 import org.apache.giraph.comm.messages.FlushableMessageStore;
 import org.apache.giraph.comm.messages.MessageStore;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.comm.messages.SequentialFileMessageStore;
-import org.apache.giraph.comm.messages.SimpleMessageStore;
 import org.apache.giraph.graph.EdgeListVertex;
-import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
@@ -214,9 +213,10 @@ public class TestMessageStores {
   }
 
   @Test
-  public void testSimpleMessageStore() {
+  public void testCollectionOfMessagesPeVertexStore() {
     try {
-      testMessageStore(SimpleMessageStore.newFactory(service, config),
+      testMessageStore(
+          CollectionOfMessagesPerVertexStore.newFactory(service, config),
           testData);
     } catch (IOException e) {
       e.printStackTrace();

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java Tue Nov 13 18:01:47 2012
@@ -19,27 +19,24 @@
 package org.apache.giraph.examples;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
-import java.util.Arrays;
-
-import org.apache.giraph.graph.VertexCombiner;
+import org.apache.giraph.graph.Combiner;
 import org.apache.hadoop.io.IntWritable;
 import org.junit.Test;
 
 public class MinimumIntCombinerTest {
 
-    @Test
-    public void testCombiner() throws Exception {
-
-        VertexCombiner<IntWritable, IntWritable> combiner =
-                new MinimumIntCombiner();
-
-        Iterable<IntWritable> result = combiner.combine(
-                new IntWritable(1), Arrays.asList(
-                new IntWritable(39947466), new IntWritable(199),
-                new IntWritable(19998888), new IntWritable(42)));
-        assertTrue(result.iterator().hasNext());
-        assertEquals(42, result.iterator().next().get());
-    }
+  @Test
+  public void testCombiner() throws Exception {
+    Combiner<IntWritable, IntWritable> combiner =
+        new MinimumIntCombiner();
+
+    IntWritable vertexId = new IntWritable(1);
+    IntWritable result = combiner.createInitialMessage();
+    combiner.combine(vertexId, result, new IntWritable(39947466));
+    combiner.combine(vertexId, result, new IntWritable(199));
+    combiner.combine(vertexId, result, new IntWritable(42));
+    combiner.combine(vertexId, result, new IntWritable(19998888));
+    assertEquals(42, result.get());
+  }
 }

Modified: giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java?rev=1408863&r1=1408862&r2=1408863&view=diff
==============================================================================
--- giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java (original)
+++ giraph/trunk/giraph/src/test/java/org/apache/giraph/utils/MockUtils.java Tue Nov 13 18:01:47 2012
@@ -18,9 +18,11 @@
 
 package org.apache.giraph.utils;
 
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
-import org.apache.giraph.comm.WorkerClientServer;
+import org.apache.giraph.comm.messages.CollectionOfMessagesPerVertexStore;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.graph.partition.BasicPartitionOwner;
@@ -154,4 +156,14 @@ public class MockUtils {
         Mockito.any(IntWritable.class))).thenAnswer(answer);
     return service;
   }
+
+  public static ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+  createNewServerData(ImmutableClassesGiraphConfiguration conf,
+      Mapper.Context context) {
+    return new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
+        conf,
+        CollectionOfMessagesPerVertexStore.newFactory(
+            MockUtils.mockServiceGetVertexPartitionOwner(1), conf),
+        context);
+  }
 }



Mime
View raw message