giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject [3/3] git commit: updated refs/heads/trunk to f43f450
Date Thu, 09 Oct 2014 22:10:23 GMT
Reduce/broadcast API

Summary:
Adding reduce and broadcast API

Reduce and broadcast API should be a simplest comprehensive API for what it and aggregators do,
and has somewhat simpler implementation, since there is no strange interaction with values from
previous and next superstep.

It is very flexible and allows building more complex abstraction on top of it.
Aggregators API is built on top of it in AggregatorToGlobalCommTranslation,
when reverse is not true, which shows it is more flexible then previous API.

Once primitive types diff goes in, it will be trivial to create generic reducers
(instead of having huge number of specialized aggregators)

Current aggregator API has multiple issues:
- no matter whether aggregated values are needed on workers, they are distributed to them
- there is no way to register aggregator for a single superstep
- in order for master to send data to workers, it needs to go through an aggregator - even though it can only be Writable
- value to be aggregated and result of aggregation need to be of the same type
- logic of how to do aggregation is combined with how Aggregator is kept (i.e. aggregate(valueToAggregate), instead of aggregate(currentAggregatedValue, valueToAggregate)), and so every aggregator needs to extend BasicAggregator, but that still limits what can be done.

Related to https://phabricator.fb.com/D1303953

Test Plan:
All unit tests, will run some jobs on production.
Will add unit test to use reduce/broadcast directly, instead of through aggregators

Reviewers: majakabiljo, avery.ching, pavanka, sergey.edunov, maja.kabiljo

Differential Revision: https://reviews.facebook.net/D21423


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f43f4500
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f43f4500
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f43f4500

Branch: refs/heads/trunk
Commit: f43f450093876ba8ae164681789cad0e6ea4b68e
Parents: 61db689
Author: Igor Kabiljo <ikabiljo@fb.com>
Authored: Thu Oct 9 14:55:20 2014 -0700
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Thu Oct 9 14:55:20 2014 -0700

----------------------------------------------------------------------
 .../giraph/aggregators/AggregatorWrapper.java   | 150 --------
 .../aggregators/ClassAggregatorFactory.java     |  17 +-
 .../giraph/benchmark/ReducersBenchmark.java     | 263 +++++++++++++
 .../giraph/bsp/CentralizedServiceMaster.java    |  20 +-
 .../org/apache/giraph/comm/GlobalCommType.java  |  32 ++
 .../org/apache/giraph/comm/MasterClient.java    |  15 +-
 .../AggregatedValueOutputStream.java            |  45 ---
 .../aggregators/AggregatorOutputStream.java     |  51 ---
 .../comm/aggregators/AggregatorUtils.java       |   6 -
 .../aggregators/AllAggregatorServerData.java    | 117 +++---
 .../GlobalCommValueOutputStream.java            |  71 ++++
 .../aggregators/OwnerAggregatorServerData.java  |  71 ++--
 .../aggregators/SendAggregatedValueCache.java   |  87 -----
 .../comm/aggregators/SendAggregatorCache.java   |  92 -----
 .../comm/aggregators/SendGlobalCommCache.java   | 102 +++++
 .../WorkerAggregatorRequestProcessor.java       |  34 +-
 .../giraph/comm/netty/NettyMasterClient.java    |  32 +-
 .../giraph/comm/netty/NettyMasterServer.java    |  10 +-
 .../NettyWorkerAggregatorRequestProcessor.java  |  44 +--
 .../handler/MasterRequestServerHandler.java     |   4 +-
 .../giraph/comm/requests/RequestType.java       |   2 +-
 .../SendAggregatorsToMasterRequest.java         |  61 ---
 .../requests/SendAggregatorsToOwnerRequest.java |  45 ++-
 .../SendAggregatorsToWorkerRequest.java         |  28 +-
 .../requests/SendReducedToMasterRequest.java    |  61 +++
 .../requests/SendWorkerAggregatorsRequest.java  |  35 +-
 .../giraph/graph/AbstractComputation.java       |  31 +-
 .../org/apache/giraph/graph/Computation.java    |  13 +-
 .../apache/giraph/graph/ComputeCallable.java    |   4 +-
 .../apache/giraph/graph/GraphTaskManager.java   |  40 +-
 .../java/org/apache/giraph/io/EdgeReader.java   |  36 +-
 .../org/apache/giraph/io/MappingReader.java     |  40 +-
 .../java/org/apache/giraph/io/VertexReader.java |  38 +-
 .../giraph/io/internal/WrappedEdgeReader.java   |  13 +-
 .../io/internal/WrappedMappingReader.java       |  14 +-
 .../giraph/io/internal/WrappedVertexReader.java |  11 +-
 .../master/AggregatorReduceOperation.java       |  92 +++++
 .../AggregatorToGlobalCommTranslation.java      | 240 ++++++++++++
 .../apache/giraph/master/BspServiceMaster.java  | 144 ++++---
 .../giraph/master/MasterAggregatorHandler.java  | 371 ++++++++-----------
 .../org/apache/giraph/master/MasterCompute.java |  40 +-
 .../giraph/master/MasterGlobalCommUsage.java    |  68 ++++
 .../giraph/reducers/OnSameReduceOperation.java  |  34 ++
 .../apache/giraph/reducers/ReduceOperation.java |  57 +++
 .../org/apache/giraph/reducers/Reducer.java     | 110 ++++++
 .../apache/giraph/reducers/package-info.java    |  21 ++
 .../apache/giraph/utils/ConfigurationUtils.java |  20 +-
 .../org/apache/giraph/utils/WritableUtils.java  |  34 ++
 .../apache/giraph/worker/BspServiceWorker.java  |  60 +--
 .../giraph/worker/EdgeInputSplitsCallable.java  |  10 +-
 .../worker/MappingInputSplitsCallable.java      |  12 +-
 .../worker/VertexInputSplitsCallable.java       |  10 +-
 .../worker/WorkerAggregatorDelegator.java       |  69 ++++
 .../giraph/worker/WorkerAggregatorHandler.java  | 241 ++++++------
 .../org/apache/giraph/worker/WorkerContext.java |  38 +-
 .../giraph/worker/WorkerGlobalCommUsage.java    |  40 ++
 .../worker/WorkerThreadAggregatorUsage.java     |  31 --
 .../worker/WorkerThreadGlobalCommUsage.java     |  32 ++
 .../java/org/apache/giraph/TestBspBasic.java    |  40 +-
 .../aggregators/TestAggregatorsHandling.java    |  96 +----
 60 files changed, 2104 insertions(+), 1541 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
deleted file mode 100644
index fa18a64..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.aggregators;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.WritableFactory;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Wrapper for aggregators. Keeps two instances of an aggregator - one for
- * the value from previous super step, and one for the value which is being
- * generated in current super step.
- *
- * @param <A> Aggregated value
- */
-public class AggregatorWrapper<A extends Writable> {
-  /** False iff aggregator should be reset at the end of each super step */
-  private final boolean persistent;
-  /** Value aggregated in previous super step */
-  private A previousAggregatedValue;
-  /** Aggregator factory */
-  private final WritableFactory<? extends Aggregator<A>> aggregatorFactory;
-  /** Aggregator for next super step */
-  private final Aggregator<A> currentAggregator;
-  /** Whether anyone changed current value since the moment it was reset */
-  private boolean changed;
-
-  /**
-   * @param aggregatorFactory Aggregator Factory
-   * @param persistent        False iff aggregator should be reset at the end
-   *                          of each super step
-   * @param conf              Configuration
-   */
-  public AggregatorWrapper(
-      WritableFactory<? extends Aggregator<A>> aggregatorFactory,
-      boolean persistent, ImmutableClassesGiraphConfiguration conf) {
-    this.persistent = persistent;
-    this.aggregatorFactory = aggregatorFactory;
-    currentAggregator = aggregatorFactory.create();
-    changed = false;
-    previousAggregatedValue = currentAggregator.createInitialValue();
-  }
-
-  /**
-   * Get aggregated value from previous super step
-   *
-   * @return Aggregated value from previous super step
-   */
-  public A getPreviousAggregatedValue() {
-    return previousAggregatedValue;
-  }
-
-  /**
-   * Set aggregated value for previous super step
-   *
-   * @param value Aggregated value to set
-   */
-  public void setPreviousAggregatedValue(A value) {
-    previousAggregatedValue = value;
-  }
-
-  /**
-   * Check if aggregator is persistent
-   *
-   * @return False iff aggregator should be reset at the end of each super step
-   */
-  public boolean isPersistent() {
-    return persistent;
-  }
-
-  /**
-   * Check if current aggregator was changed
-   *
-   * @return Whether anyone changed current value since the moment it was reset
-   */
-  public boolean isChanged() {
-    return changed;
-  }
-
-  /**
-   * Add a new value to current aggregator
-   *
-   * @param value Value to be aggregated
-   */
-  public synchronized void aggregateCurrent(A value) {
-    changed = true;
-    currentAggregator.aggregate(value);
-  }
-
-  /**
-   * Get current aggregated value
-   *
-   * @return Current aggregated value
-   */
-  public A getCurrentAggregatedValue() {
-    return currentAggregator.getAggregatedValue();
-  }
-
-  /**
-   * Set aggregated value of current aggregator
-   *
-   * @param value Value to set it to
-   */
-  public void setCurrentAggregatedValue(A value) {
-    changed = true;
-    currentAggregator.setAggregatedValue(value);
-  }
-
-  /**
-   * Reset the value of current aggregator to neutral value
-   */
-  public void resetCurrentAggregator() {
-    changed = false;
-    currentAggregator.reset();
-  }
-
-  /**
-   * Return new aggregated value which is neutral to aggregate operation
-   *
-   * @return Neutral value
-   */
-  public A createInitialValue() {
-    return currentAggregator.createInitialValue();
-  }
-
-  /**
-   * Get class of wrapped aggregator
-   *
-   * @return Aggregator class
-   */
-  public WritableFactory<? extends Aggregator<A>> getAggregatorFactory() {
-    return aggregatorFactory;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
index 944656e..a022480 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
@@ -21,8 +21,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.WritableFactory;
 import org.apache.giraph.utils.WritableUtils;
@@ -36,7 +34,6 @@ import com.google.common.base.Preconditions;
  * @param <T> Aggregated value type
  */
 public class ClassAggregatorFactory<T extends Writable>
-    extends DefaultImmutableClassesGiraphConfigurable
     implements WritableFactory<Aggregator<T>> {
   /** Aggregator class */
   private Class<? extends Aggregator<T>> aggregatorClass;
@@ -51,26 +48,14 @@ public class ClassAggregatorFactory<T extends Writable>
    */
   public ClassAggregatorFactory(
       Class<? extends Aggregator<T>> aggregatorClass) {
-    this(aggregatorClass, null);
-
-  }
-
-  /**
-   * Constructor
-   * @param aggregatorClass Aggregator class
-   * @param conf Configuration
-   */
-  public ClassAggregatorFactory(Class<? extends Aggregator<T>> aggregatorClass,
-      ImmutableClassesGiraphConfiguration conf) {
     Preconditions.checkNotNull(aggregatorClass,
         "aggregatorClass cannot be null in ClassAggregatorFactory");
     this.aggregatorClass = aggregatorClass;
-    setConf(conf);
   }
 
   @Override
   public Aggregator<T> create() {
-    return ReflectionUtils.newInstance(aggregatorClass, getConf());
+    return ReflectionUtils.newInstance(aggregatorClass, null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
new file mode 100644
index 0000000..ce5c96e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.benchmark;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
+import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.reducers.OnSameReduceOperation;
+import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Benchmark for reducers. Also checks the correctness.
+ */
+public class ReducersBenchmark extends GiraphBenchmark {
+  /** Number of reducers setting */
+  private static final String REDUCERS_NUM = "reducersbenchmark.num";
+
+  /** Option for number of reducers */
+  private static final BenchmarkOption REDUCERS =
+      new BenchmarkOption("r", "reducers",
+          true, "Reducers", "Need to set number of reducers (-r)");
+
+  /** LongSumReducer */
+  public static class TestLongSumReducer
+      extends OnSameReduceOperation<LongWritable> {
+    /** Singleton */
+    public static final TestLongSumReducer INSTANCE = new TestLongSumReducer();
+
+    @Override
+    public LongWritable createInitialValue() {
+      return new LongWritable();
+    }
+
+    @Override
+    public void reduceSingle(
+        LongWritable curValue, LongWritable valueToReduce) {
+      curValue.set(curValue.get() + valueToReduce.get());
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+    }
+  }
+
+  /**
+   * Vertex class for ReducersBenchmark
+   */
+  public static class ReducersBenchmarkComputation extends
+      BasicComputation<LongWritable, DoubleWritable, DoubleWritable,
+          DoubleWritable> {
+    @Override
+    public void compute(
+        Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
+        Iterable<DoubleWritable> messages) throws IOException {
+      int n = getNumReducers(getConf());
+      long superstep = getSuperstep();
+      int w = getWorkerContextReduced(getConf(), superstep);
+      for (int i = 0; i < n; i++) {
+        reduce("w" + i, new LongWritable((superstep + 1) * i));
+        reduce("p" + i, new LongWritable(i));
+
+        if (superstep > 0) {
+          assertEquals(superstep * (getTotalNumVertices() * i) + w,
+              ((LongWritable) getBroadcast("w" + i)).get());
+          assertEquals(-(superstep * i),
+              ((LongWritable) getBroadcast("m" + i)).get());
+          assertEquals(superstep * getTotalNumVertices() * i,
+              ((LongWritable) getBroadcast("p" + i)).get());
+        }
+      }
+      if (superstep > 2) {
+        vertex.voteToHalt();
+      }
+    }
+  }
+
+  /**
+   * MasterCompute class for ReducersBenchmark
+   */
+  public static class ReducersBenchmarkMasterCompute extends
+      DefaultMasterCompute {
+    @Override
+    public void compute() {
+      int n = getNumReducers(getConf());
+      long superstep = getSuperstep();
+      int w = getWorkerContextReduced(getConf(), superstep);
+      for (int i = 0; i < n; i++) {
+        String wi = "w" + i;
+        String mi = "m" + i;
+        String pi = "p" + i;
+
+        registerReduce(wi, TestLongSumReducer.INSTANCE);
+        registerReduce(mi, new TestLongSumReducer());
+
+        if (superstep > 0) {
+          broadcast(wi, getReduced(wi));
+          broadcast(mi, new LongWritable(-superstep * i));
+          broadcast(pi, getReduced(pi));
+
+          registerReduce(pi, new TestLongSumReducer(),
+              (LongWritable) getReduced(pi));
+
+          assertEquals(superstep * (getTotalNumVertices() * i) + w,
+              ((LongWritable) getReduced(wi)).get());
+          assertEquals(superstep * getTotalNumVertices() * i,
+              ((LongWritable) getReduced(pi)).get());
+        } else {
+          registerReduce(pi, new TestLongSumReducer());
+        }
+      }
+    }
+  }
+
+  /**
+   * WorkerContext class for ReducersBenchmark
+   */
+  public static class ReducersBenchmarkWorkerContext
+      extends DefaultWorkerContext {
+    @Override
+    public void preSuperstep() {
+      addToWorkerReducers(1);
+      checkReducers();
+    }
+
+    @Override
+    public void postSuperstep() {
+      addToWorkerReducers(2);
+      checkReducers();
+    }
+
+    /**
+     * Check if reducer values are correct for current superstep
+     */
+    private void checkReducers() {
+      int n = getNumReducers(getContext().getConfiguration());
+      long superstep = getSuperstep();
+      int w = getWorkerContextReduced(
+          getContext().getConfiguration(), superstep);
+      for (int i = 0; i < n; i++) {
+        if (superstep > 0) {
+          assertEquals(superstep * (getTotalNumVertices() * i) + w,
+              ((LongWritable) getBroadcast("w" + i)).get());
+          assertEquals(-(superstep * i),
+              ((LongWritable) getBroadcast("m" + i)).get());
+          assertEquals(superstep * getTotalNumVertices() * i,
+              ((LongWritable) getBroadcast("p" + i)).get());
+        }
+      }
+    }
+
+    /**
+     * Add some value to worker reducers.
+     *
+     * @param valueToAdd Which value to add
+     */
+    private void addToWorkerReducers(int valueToAdd) {
+      int n = getNumReducers(getContext().getConfiguration());
+      for (int i = 0; i < n; i++) {
+        reduce("w" + i, new LongWritable(valueToAdd));
+      }
+    }
+  }
+
+  /**
+   * Get the number of reducers from configuration
+   *
+   * @param conf Configuration
+   * @return Number of reducers
+   */
+  private static int getNumReducers(Configuration conf) {
+    return conf.getInt(REDUCERS_NUM, 0);
+  }
+
+  /**
+   * Get the value which should be reduced by worker context
+   *
+   * @param conf Configuration
+   * @param superstep Superstep
+   * @return The value which should be reduced by worker context
+   */
+  private static int getWorkerContextReduced(Configuration conf,
+      long superstep) {
+    return (superstep <= 0) ? 0 : conf.getInt("workers", 0) * 3;
+  }
+
+  /**
+   * Check if values are equal, throw an exception if they aren't
+   *
+   * @param expected Expected value
+   * @param actual Actual value
+   */
+  private static void assertEquals(long expected, long actual) {
+    if (expected != actual) {
+      throw new RuntimeException("expected: " + expected +
+          ", actual: " + actual);
+    }
+  }
+
+  @Override
+  public Set<BenchmarkOption> getBenchmarkOptions() {
+    return Sets.newHashSet(BenchmarkOption.VERTICES, REDUCERS);
+  }
+
+  @Override
+  protected void prepareConfiguration(GiraphConfiguration conf,
+      CommandLine cmd) {
+    conf.setComputationClass(ReducersBenchmarkComputation.class);
+    conf.setMasterComputeClass(ReducersBenchmarkMasterCompute.class);
+    conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
+    conf.setWorkerContextClass(ReducersBenchmarkWorkerContext.class);
+    conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
+        BenchmarkOption.VERTICES.getOptionLongValue(cmd));
+    conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX, 1);
+    conf.setInt(REDUCERS_NUM, REDUCERS.getOptionIntValue(cmd));
+    conf.setInt("workers", conf.getInt(GiraphConstants.MAX_WORKERS, -1));
+  }
+
+  /**
+   * Execute the benchmark.
+   *
+   * @param args Typically the command line arguments.
+   * @throws Exception Any exception from the computation.
+   */
+  public static void main(final String[] args) throws Exception {
+    System.exit(ToolRunner.run(new ReducersBenchmark(), args));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
index 9b4f9d6..1e8d519 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
@@ -18,6 +18,10 @@
 
 package org.apache.giraph.bsp;
 
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
 import org.apache.giraph.master.MasterAggregatorHandler;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.master.MasterInfo;
@@ -26,9 +30,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.zookeeper.KeeperException;
 
-import java.io.IOException;
-import java.util.List;
-
 /**
  * At most, there will be one active master at a time, but many threads can
  * be trying to be the active master.
@@ -139,11 +140,18 @@ public interface CentralizedServiceMaster<I extends WritableComparable,
     long desiredSuperstep);
 
   /**
-   * Get master aggregator handler
+   * Get handler for global communication
+   *
+   * @return Global communication handler
+   */
+  MasterAggregatorHandler getGlobalCommHandler();
+
+  /**
+   * Handler for aggregators to reduce/broadcast translation
    *
-   * @return Master aggregator handler
+   * @return aggregator translation handler
    */
-  MasterAggregatorHandler getAggregatorHandler();
+  AggregatorToGlobalCommTranslation getAggregatorTranslationHandler();
 
   /**
    * Get MasterCompute object

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/GlobalCommType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/GlobalCommType.java b/giraph-core/src/main/java/org/apache/giraph/comm/GlobalCommType.java
new file mode 100644
index 0000000..539b3bd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/GlobalCommType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.comm;
+
+/**
+ * Type tag distinguishing different global communication messages.
+ */
+public enum GlobalCommType {
+  /** ReduceOperation object */
+  REDUCE_OPERATIONS,
+  /** Reduced value object */
+  REDUCED_VALUE,
+  /** Broadcasted value */
+  BROADCAST,
+  /** Special count used internally for counting requests */
+  SPECIAL_COUNT;
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
index b7718a7..aea93fd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
@@ -20,8 +20,6 @@ package org.apache.giraph.comm;
 
 import java.io.IOException;
 
-import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -36,19 +34,18 @@ public interface MasterClient {
   /**
    * Sends aggregator to its owner
    *
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatorFactory Aggregator factory
-   * @param aggregatedValue Value of the aggregator
+   * @param name Name of the object
+   * @param type Global communication type
+   * @param value Object value
    * @throws IOException
    */
-  void sendAggregator(String aggregatorName,
-      WritableFactory<? extends Aggregator> aggregatorFactory,
-      Writable aggregatedValue) throws IOException;
+  void sendToOwner(String name, GlobalCommType type, Writable value)
+    throws IOException;
 
   /**
    * Flush aggregated values cache.
    */
-  void finishSendingAggregatedValues() throws IOException;
+  void finishSendingValues() throws IOException;
 
   /**
    * Flush all outgoing messages.  This will synchronously ensure that all

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java
deleted file mode 100644
index 0010dba..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import org.apache.hadoop.io.Writable;
-
-import java.io.IOException;
-
-/**
- * Implementation of {@link CountingOutputStream} which allows writing of
- * aggregator values in the form of pair (name, value)
- */
-public class AggregatedValueOutputStream extends CountingOutputStream {
-  /**
-   * Write aggregator to the stream and increment internal counter
-   *
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatedValue Value of aggregator
-   * @return Number of bytes occupied by the stream
-   * @throws IOException
-   */
-  public int addAggregator(String aggregatorName,
-      Writable aggregatedValue) throws IOException {
-    incrementCounter();
-    dataOutput.writeUTF(aggregatorName);
-    aggregatedValue.write(dataOutput);
-    return getSize();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
deleted file mode 100644
index 79bc08a..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import java.io.IOException;
-
-import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.utils.WritableFactory;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Implementation of {@link CountingOutputStream} which allows writing of
- * aggregators in the form of triple (name, classname, value)
- */
-public class AggregatorOutputStream extends CountingOutputStream {
-  /**
-   * Write aggregator to the stream and increment internal counter
-   *
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatorFactory Aggregator factory
-   * @param aggregatedValue Value of aggregator
-   * @return Number of bytes occupied by the stream
-   * @throws IOException
-   */
-  public int addAggregator(String aggregatorName,
-      WritableFactory<? extends Aggregator> aggregatorFactory,
-      Writable aggregatedValue) throws IOException {
-    incrementCounter();
-    dataOutput.writeUTF(aggregatorName);
-    WritableUtils.writeWritableObject(aggregatorFactory, dataOutput);
-    aggregatedValue.write(dataOutput);
-    return getSize();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
index a94ab38..ecb3a6b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
@@ -27,12 +27,6 @@ import org.apache.giraph.worker.WorkerInfo;
  * Class for aggregator constants and utility methods
  */
 public class AggregatorUtils {
-  /**
-   * Special aggregator name which will be used to send the total number of
-   * aggregators requests which should arrive
-   */
-  public static final String SPECIAL_COUNT_AGGREGATOR =
-      "__aggregatorRequestCount";
 
   /** How big a single aggregator request can be (in bytes) */
   public static final String MAX_BYTES_PER_AGGREGATOR_REQUEST =

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
index effc9bf..b1c0781 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
@@ -21,19 +21,21 @@ package org.apache.giraph.comm.aggregators;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.master.MasterInfo;
-import org.apache.giraph.utils.Factory;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.reducers.Reducer;
 import org.apache.giraph.utils.TaskIdsPermitsBarrier;
-import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -51,12 +53,12 @@ public class AllAggregatorServerData {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(AllAggregatorServerData.class);
-  /** Map of aggregator factories */
-  private final ConcurrentMap<String, WritableFactory<Aggregator<Writable>>>
-  aggregatorFactoriesMap = Maps.newConcurrentMap();
-  /** Map of values of aggregators from previous superstep */
+  /** Map of broadcasted values from master */
   private final ConcurrentMap<String, Writable>
-  aggregatedValuesMap = Maps.newConcurrentMap();
+  broadcastedMap = Maps.newConcurrentMap();
+  /** Map of registered reducers for current superstep */
+  private final ConcurrentMap<String, ReduceOperation<Object, Writable>>
+  reduceOpMap = Maps.newConcurrentMap();
   /**
    * Counts the requests with final aggregators from master.
    * It uses values from special aggregators
@@ -97,54 +99,36 @@ public class AllAggregatorServerData {
   }
 
   /**
-   * Register the class of the aggregator, received by master or worker.
-   *
-   * @param name              Aggregator name
-   * @param aggregatorFactory Aggregator factory
-   */
-  public void registerAggregatorClass(String name,
-      WritableFactory<Aggregator<Writable>> aggregatorFactory) {
-    aggregatorFactoriesMap.put(name, aggregatorFactory);
-    progressable.progress();
-  }
-
-  /**
-   * Set the value of aggregator from previous superstep,
-   * received by master or worker.
-   *
-   * @param name Name of the aggregator
-   * @param value Value of the aggregator
-   */
-  public void setAggregatorValue(String name, Writable value) {
-    aggregatedValuesMap.put(name, value);
-    progressable.progress();
-  }
-
-  /**
-   * Create initial aggregated value for an aggregator. Used so requests
-   * would be able to deserialize data.
-   * registerAggregatorClass needs to be called first to ensure that we have
-   * the class of the aggregator.
-   *
-   * @param name Name of the aggregator
-   * @return Empty aggregated value for this aggregator
+   * Received value through global communication from master.
+   * @param name Name
+   * @param type Global communication type
+   * @param value Object value
    */
-  public Writable createAggregatorInitialValue(String name) {
-    WritableFactory<Aggregator<Writable>> aggregatorFactory =
-        aggregatorFactoriesMap.get(name);
-    synchronized (aggregatorFactory) {
-      return aggregatorFactory.create().createInitialValue();
+  public void receiveValueFromMaster(
+      String name, GlobalCommType type, Writable value) {
+    switch (type) {
+    case BROADCAST:
+      broadcastedMap.put(name, value);
+      break;
+
+    case REDUCE_OPERATIONS:
+      reduceOpMap.put(name, (ReduceOperation<Object, Writable>) value);
+      break;
+
+    default:
+      throw new IllegalArgumentException("Unkown request type " + type);
     }
+    progressable.progress();
   }
 
   /**
    * Notify this object that an aggregator request from master has been
    * received.
    *
-   * @param aggregatorData Byte request with data received from master
+   * @param data Byte request with data received from master
    */
-  public void receivedRequestFromMaster(byte[] aggregatorData) {
-    masterData.add(aggregatorData);
+  public void receivedRequestFromMaster(byte[] data) {
+    masterData.add(data);
     masterBarrier.releaseOnePermit();
   }
 
@@ -200,35 +184,32 @@ public class AllAggregatorServerData {
    * arrived, and fill the maps for next superstep when ready.
    *
    * @param workerIds All workers in the job apart from the current one
-   * @param previousAggregatedValuesMap Map of values from previous
-   *                                    superstep to fill out
-   * @param currentAggregatorFactoryMap Map of aggregators factories for
-   *                                    current superstep to fill out.
+   * @param broadcastedMapToFill Broadcast map to fill out
+   * @param reducerMapToFill Registered reducer map to fill out.
    */
   public void fillNextSuperstepMapsWhenReady(
       Set<Integer> workerIds,
-      Map<String, Writable> previousAggregatedValuesMap,
-      Map<String, Factory<Aggregator<Writable>>> currentAggregatorFactoryMap) {
+      Map<String, Writable> broadcastedMapToFill,
+      Map<String, Reducer<Object, Writable>> reducerMapToFill) {
     workersBarrier.waitForRequiredPermits(workerIds);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("fillNextSuperstepMapsWhenReady: Aggregators ready");
+      LOG.debug("fillNextSuperstepMapsWhenReady: Global data ready");
     }
-    previousAggregatedValuesMap.clear();
-    previousAggregatedValuesMap.putAll(aggregatedValuesMap);
-    for (Map.Entry<String, WritableFactory<Aggregator<Writable>>> entry :
-        aggregatorFactoriesMap.entrySet()) {
-      Factory<Aggregator<Writable>> aggregatorFactory =
-          currentAggregatorFactoryMap.get(entry.getKey());
-      if (aggregatorFactory == null) {
-        currentAggregatorFactoryMap.put(entry.getKey(), entry.getValue());
-      }
+
+    Preconditions.checkArgument(broadcastedMapToFill.isEmpty(),
+        "broadcastedMap needs to be empty for filling");
+    Preconditions.checkArgument(reducerMapToFill.isEmpty(),
+        "reducerMap needs to be empty for filling");
+
+    broadcastedMapToFill.putAll(broadcastedMap);
+
+    for (Entry<String, ReduceOperation<Object, Writable>> entry :
+        reduceOpMap.entrySet()) {
+      reducerMapToFill.put(entry.getKey(), new Reducer<>(entry.getValue()));
     }
-  }
 
-  /**
-   * Prepare for next superstep
-   */
-  public void reset() {
+    broadcastedMap.clear();
+    reduceOpMap.clear();
     masterData.clear();
     if (LOG.isDebugEnabled()) {
       LOG.debug("reset: Ready for next superstep");

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/GlobalCommValueOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/GlobalCommValueOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/GlobalCommValueOutputStream.java
new file mode 100644
index 0000000..0add1e9
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/GlobalCommValueOutputStream.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.aggregators;
+
+import java.io.IOException;
+
+import org.apache.giraph.comm.GlobalCommType;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Implementation of {@link CountingOutputStream} which allows writing of
+ * reduced values in the form of pair (name, type, value)
+ *
+ * There are two modes:
+ * - when class of the value is written into the stream.
+ * - when it isn't, and reader needs to know Class of the value in order
+ *   to read it.
+ */
+public class GlobalCommValueOutputStream extends CountingOutputStream {
+  /** whether to write Class object for values into the stream */
+  private final boolean writeClass;
+
+  /**
+   * Constructor
+   *
+   * @param writeClass boolean whether to write Class object for values
+   */
+  public GlobalCommValueOutputStream(boolean writeClass) {
+    this.writeClass = writeClass;
+  }
+
+  /**
+   * Write global communication object to the stream
+   * and increment internal counter
+   *
+   * @param name Name
+   * @param type Global communication type
+   * @param value Object value
+   * @return Number of bytes occupied by the stream
+   * @throws IOException
+   */
+  public int addValue(String name, GlobalCommType type,
+      Writable value) throws IOException {
+    incrementCounter();
+    dataOutput.writeUTF(name);
+    dataOutput.writeByte(type.ordinal());
+    if (writeClass) {
+      WritableUtils.writeWritableObject(value, dataOutput);
+    } else {
+      value.write(dataOutput);
+    }
+    return getSize();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
index 2f3d5e5..9e92efc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
@@ -23,9 +23,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.reducers.Reducer;
 import org.apache.giraph.utils.TaskIdsPermitsBarrier;
-import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
@@ -62,13 +62,12 @@ public class OwnerAggregatorServerData {
   private static final Logger LOG =
       Logger.getLogger(OwnerAggregatorServerData.class);
   /** Map of aggregators which current worker owns */
-  private final ConcurrentMap<String, Aggregator<Writable>>
-  myAggregatorMap = Maps.newConcurrentMap();
+  private final ConcurrentMap<String, Reducer<Object, Writable>>
+  myReducerMap = Maps.newConcurrentMap();
   /**
    * Counts the requests with partial aggregated values from other workers.
-   * It uses values from special aggregators
-   * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)
-   * to know how many requests it has to receive.
+   * It uses GlobalCommType.SPECIAL_COUNT to know how many requests it
+   * has to receive.
    */
   private final TaskIdsPermitsBarrier workersBarrier;
   /** Progressable used to report progress */
@@ -85,49 +84,50 @@ public class OwnerAggregatorServerData {
   }
 
   /**
-   * Register an aggregator which current worker owns. Thread-safe.
+   * Register a reducer which current worker owns. Thread-safe.
    *
    * @param name Name of aggregator
-   * @param aggregatorFactory Aggregator factory
+   * @param reduceOp Reduce operation
    */
-  public void registerAggregator(String name,
-      WritableFactory<Aggregator<Writable>> aggregatorFactory) {
-    if (LOG.isDebugEnabled() && myAggregatorMap.isEmpty()) {
+  public void registerReducer(String name,
+      ReduceOperation<Object, Writable> reduceOp) {
+    if (LOG.isDebugEnabled() && myReducerMap.isEmpty()) {
       LOG.debug("registerAggregator: The first registration after a reset()");
     }
-    myAggregatorMap.putIfAbsent(name, aggregatorFactory.create());
+    myReducerMap.putIfAbsent(name, new Reducer<>(reduceOp));
     progressable.progress();
   }
 
   /**
-   * Aggregate partial value of one of current worker's aggregators.
+   * Reduce partial value of one of current worker's reducers.
    *
-   * Thread-safe. Call only after aggregators have been registered.
+   * Thread-safe. Call only after reducers have been registered.
    *
-   * @param name Name of the aggregator
-   * @param value Value to aggregate to it
+   * @param name Name of the reducer
+   * @param value Value to reduce to it
    */
-  public void aggregate(String name, Writable value) {
-    Aggregator<Writable> aggregator = myAggregatorMap.get(name);
-    synchronized (aggregator) {
-      aggregator.aggregate(value);
+  public void reduce(String name, Writable value) {
+    Reducer<Object, Writable> reducer = myReducerMap.get(name);
+    synchronized (reducer) {
+      reducer.reducePartial(value);
     }
     progressable.progress();
   }
 
+
   /**
-   * Create initial aggregated value for an aggregator. Used so requests
+   * Create initial value for a reducer. Used so requests
    * would be able to deserialize data.
    *
-   * Thread-safe. Call only after aggregators have been registered.
+   * Thread-safe. Call only after reducer has been registered.
    *
-   * @param name Name of the aggregator
-   * @return Empty aggregated value for this aggregator
+   * @param name Name of the reducer
+   * @return Empty value
    */
-  public Writable createAggregatorInitialValue(String name) {
-    Aggregator<Writable> aggregator = myAggregatorMap.get(name);
-    synchronized (aggregator) {
-      return aggregator.createInitialValue();
+  public Writable createInitialValue(String name) {
+    Reducer<Object, Writable> reducer = myReducerMap.get(name);
+    synchronized (reducer) {
+      return reducer.createInitialValue();
     }
   }
 
@@ -159,20 +159,20 @@ public class OwnerAggregatorServerData {
    * @return Iterable through final aggregated values which this worker owns
    */
   public Iterable<Map.Entry<String, Writable>>
-  getMyAggregatorValuesWhenReady(Set<Integer> workerIds) {
+  getMyReducedValuesWhenReady(Set<Integer> workerIds) {
     workersBarrier.waitForRequiredPermits(workerIds);
     if (LOG.isDebugEnabled()) {
       LOG.debug("getMyAggregatorValuesWhenReady: Values ready");
     }
-    return Iterables.transform(myAggregatorMap.entrySet(),
-        new Function<Map.Entry<String, Aggregator<Writable>>,
+    return Iterables.transform(myReducerMap.entrySet(),
+        new Function<Map.Entry<String, Reducer<Object, Writable>>,
             Map.Entry<String, Writable>>() {
           @Override
           public Map.Entry<String, Writable> apply(
-              Map.Entry<String, Aggregator<Writable>> aggregator) {
+              Map.Entry<String, Reducer<Object, Writable>> aggregator) {
             return new AbstractMap.SimpleEntry<String, Writable>(
                 aggregator.getKey(),
-                aggregator.getValue().getAggregatedValue());
+                aggregator.getValue().getCurrentValue());
           }
         });
   }
@@ -181,9 +181,10 @@ public class OwnerAggregatorServerData {
    * Prepare for next superstep
    */
   public void reset() {
-    myAggregatorMap.clear();
+    myReducerMap.clear();
     if (LOG.isDebugEnabled()) {
       LOG.debug("reset: Ready for next superstep");
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java
deleted file mode 100644
index 468ee5c..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Takes and serializes aggregated values and keeps them grouped by owner
- * partition id, to be sent in bulk.
- */
-public class SendAggregatedValueCache extends CountingCache {
-  /** Map from worker partition id to aggregator output stream */
-  private final Map<Integer, AggregatedValueOutputStream> aggregatorMap =
-      Maps.newHashMap();
-
-  /**
-   * Add aggregated value to the cache
-   *
-   * @param taskId Task id of worker which owns the aggregator
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatedValue Value of the aggregator
-   * @return Number of bytes in serialized data for this worker
-   * @throws IOException
-   */
-  public int addAggregator(Integer taskId, String aggregatorName,
-      Writable aggregatedValue) throws IOException {
-    AggregatedValueOutputStream out = aggregatorMap.get(taskId);
-    if (out == null) {
-      out = new AggregatedValueOutputStream();
-      aggregatorMap.put(taskId, out);
-    }
-    return out.addAggregator(aggregatorName, aggregatedValue);
-  }
-
-  /**
-   * Remove and get aggregators for certain worker
-   *
-   * @param taskId Partition id of worker owner
-   * @return Serialized aggregator data for this worker
-   */
-  public byte[] removeAggregators(Integer taskId) {
-    incrementCounter(taskId);
-    AggregatedValueOutputStream out = aggregatorMap.remove(taskId);
-    if (out == null) {
-      return new byte[4];
-    } else {
-      return out.flush();
-    }
-  }
-
-  /**
-   * Creates fake aggregator which will hold the total number of aggregator
-   * requests for worker with selected task id. This should be called after all
-   * aggregators for the worker have been added to the cache.
-   *
-   * @param taskId Destination worker's task id
-   * @throws IOException
-   */
-  public void addCountAggregator(Integer taskId) throws IOException {
-    // current number of requests, plus one for the last flush
-    long totalCount = getCount(taskId) + 1;
-    addAggregator(taskId, AggregatorUtils.SPECIAL_COUNT_AGGREGATOR,
-        new LongWritable(totalCount));
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
deleted file mode 100644
index 8f880b4..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.utils.WritableFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Maps;
-
-/**
- * Takes and serializes aggregators and keeps them grouped by owner
- * partition id, to be sent in bulk.
- */
-public class SendAggregatorCache extends CountingCache {
-  /** Map from worker partition id to aggregator output stream */
-  private final Map<Integer, AggregatorOutputStream> aggregatorMap =
-      Maps.newHashMap();
-
-  /**
-   * Add aggregator to the cache
-   *
-   * @param taskId Task id of worker which owns the aggregator
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatorFactory Aggregator factory
-   * @param aggregatedValue Value of the aggregator
-   * @return Number of bytes in serialized data for this worker
-   * @throws IOException
-   */
-  public int addAggregator(Integer taskId, String aggregatorName,
-      WritableFactory<? extends Aggregator> aggregatorFactory,
-      Writable aggregatedValue) throws IOException {
-    AggregatorOutputStream out = aggregatorMap.get(taskId);
-    if (out == null) {
-      out = new AggregatorOutputStream();
-      aggregatorMap.put(taskId, out);
-    }
-    return out.addAggregator(aggregatorName, aggregatorFactory,
-        aggregatedValue);
-  }
-
-  /**
-   * Remove and get aggregators for certain worker
-   *
-   * @param taskId Task id of worker owner
-   * @return Serialized aggregator data for this worker
-   */
-  public byte[] removeAggregators(Integer taskId) {
-    incrementCounter(taskId);
-    AggregatorOutputStream out = aggregatorMap.remove(taskId);
-    if (out == null) {
-      return new byte[4];
-    } else {
-      return out.flush();
-    }
-  }
-
-  /**
-   * Creates fake aggregator which will hold the total number of aggregator
-   * requests for worker with selected task id. This should be called after all
-   * aggregators for the worker have been added to the cache.
-   *
-   * @param taskId Destination worker's task id
-   * @throws IOException
-   */
-  public void addCountAggregator(Integer taskId) throws IOException {
-    // current number of requests, plus one for the last flush
-    long totalCount = getCount(taskId) + 1;
-    addAggregator(taskId, AggregatorUtils.SPECIAL_COUNT_AGGREGATOR,
-        null, new LongWritable(totalCount));
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendGlobalCommCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendGlobalCommCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendGlobalCommCache.java
new file mode 100644
index 0000000..5e10c2f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendGlobalCommCache.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.aggregators;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.giraph.comm.GlobalCommType;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Takes and serializes global communication values and keeps them grouped by
+ * owner partition id, to be sent in bulk.
+ * Includes broadcast messages, reducer registrations and special count.
+ */
+public class SendGlobalCommCache extends CountingCache {
+  /** Map from worker partition id to global communication output stream */
+  private final Map<Integer, GlobalCommValueOutputStream> globalCommMap =
+      Maps.newHashMap();
+
+  /** whether to write Class object for values into the stream */
+  private final boolean writeClass;
+
+  /**
+   * Constructor
+   *
+   * @param writeClass boolean whether to write Class object for values
+   */
+  public SendGlobalCommCache(boolean writeClass) {
+    this.writeClass = writeClass;
+  }
+
+  /**
+   * Add global communication value to the cache
+   *
+   * @param taskId Task id of worker which owns the value
+   * @param name Name
+   * @param type Global communication type
+   * @param value Value
+   * @return Number of bytes in serialized data for this worker
+   * @throws IOException
+   */
+  public int addValue(Integer taskId, String name,
+      GlobalCommType type, Writable value) throws IOException {
+    GlobalCommValueOutputStream out = globalCommMap.get(taskId);
+    if (out == null) {
+      out = new GlobalCommValueOutputStream(writeClass);
+      globalCommMap.put(taskId, out);
+    }
+    return out.addValue(name, type, value);
+  }
+
+  /**
+   * Remove and get values for certain worker
+   *
+   * @param taskId Partition id of worker owner
+   * @return Serialized global communication data for this worker
+   */
+  public byte[] removeSerialized(Integer taskId) {
+    incrementCounter(taskId);
+    GlobalCommValueOutputStream out = globalCommMap.remove(taskId);
+    if (out == null) {
+      return new byte[4];
+    } else {
+      return out.flush();
+    }
+  }
+
+  /**
+   * Creates special value which will hold the total number of global
+   * communication requests for worker with selected task id. This should be
+   * called after all values for the worker have been added to the cache.
+   *
+   * @param taskId Destination worker's task id
+   * @throws IOException
+   */
+  public void addSpecialCount(Integer taskId) throws IOException {
+    // current number of requests, plus one for the last flush
+    long totalCount = getCount(taskId) + 1;
+    addValue(taskId, GlobalCommType.SPECIAL_COUNT.name(),
+        GlobalCommType.SPECIAL_COUNT, new LongWritable(totalCount));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java
index 360a39b..42009a2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java
@@ -18,25 +18,25 @@
 
 package org.apache.giraph.comm.aggregators;
 
-import org.apache.hadoop.io.Writable;
-
 import java.io.IOException;
 
+import org.apache.hadoop.io.Writable;
+
 /**
  * Aggregates worker aggregator requests and sends them off
  */
 public interface WorkerAggregatorRequestProcessor {
   /**
-   * Sends worker aggregated value to the owner of aggregator
+   * Sends worker reduced value to the owner of reducer
    *
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatedValue Value of the aggregator
+   * @param name Name of the reducer
+   * @param reducedValue Reduced partial value
    * @throws java.io.IOException
-   * @return True if aggregated value will be sent, false if this worker is
-   * the owner of the aggregator
+   * @return True if reduced value will be sent, false if this worker is
+   * the owner of the reducer
    */
-  boolean sendAggregatedValue(String aggregatorName,
-      Writable aggregatedValue) throws IOException;
+  boolean sendReducedValue(String name,
+      Writable reducedValue) throws IOException;
 
   /**
    * Flush aggregated values cache.
@@ -46,19 +46,19 @@ public interface WorkerAggregatorRequestProcessor {
   void flush() throws IOException;
 
   /**
-   * Sends aggregated values to the master. This worker is the owner of these
-   * aggregators.
+   * Sends reduced values to the master. This worker is the owner of these
+   * reducers.
    *
-   * @param aggregatorData Serialized aggregator data
+   * @param data Serialized reduced values data
    * @throws IOException
    */
-  void sendAggregatedValuesToMaster(byte[] aggregatorData) throws IOException;
+  void sendReducedValuesToMaster(byte[] data) throws IOException;
 
   /**
-   * Sends aggregators to all other workers
+   * Sends reduced values to all other workers
    *
-   * @param aggregatorDataList Serialized aggregator data split into chunks
+   * @param reducedDataList Serialized reduced values data split into chunks
    */
-  void distributeAggregators(
-      Iterable<byte[]> aggregatorDataList) throws IOException;
+  void distributeReducedValues(
+      Iterable<byte[]> reducedDataList) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
index 51277c9..e110782 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
@@ -20,14 +20,13 @@ package org.apache.giraph.comm.netty;
 
 import java.io.IOException;
 
-import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
+import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
-import org.apache.giraph.comm.aggregators.SendAggregatorCache;
+import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
 import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.WritableFactory;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -42,8 +41,8 @@ public class NettyMasterClient implements MasterClient {
   /** Worker information for current superstep */
   private final CentralizedServiceMaster<?, ?, ?> service;
   /** Cached map of partition ids to serialized aggregator data */
-  private final SendAggregatorCache sendAggregatorCache =
-      new SendAggregatorCache();
+  private final SendGlobalCommCache sendGlobalCommCache =
+      new SendGlobalCommCache(true);
   /** How big a single aggregator request can be */
   private final int maxBytesPerAggregatorRequest;
   /** Progressable used to report progress */
@@ -78,26 +77,25 @@ public class NettyMasterClient implements MasterClient {
   }
 
   @Override
-  public void sendAggregator(String aggregatorName,
-      WritableFactory<? extends Aggregator> aggregatorFactory,
-      Writable aggregatedValue) throws IOException {
+  public void sendToOwner(String name, GlobalCommType sendType, Writable object)
+    throws IOException {
     WorkerInfo owner =
-        AggregatorUtils.getOwner(aggregatorName, service.getWorkerInfoList());
-    int currentSize = sendAggregatorCache.addAggregator(owner.getTaskId(),
-        aggregatorName, aggregatorFactory, aggregatedValue);
+        AggregatorUtils.getOwner(name, service.getWorkerInfoList());
+    int currentSize = sendGlobalCommCache.addValue(owner.getTaskId(),
+        name, sendType, object);
     if (currentSize >= maxBytesPerAggregatorRequest) {
       flushAggregatorsToWorker(owner);
     }
   }
 
   @Override
-  public void finishSendingAggregatedValues() throws IOException {
+  public void finishSendingValues() throws IOException {
     for (WorkerInfo worker : service.getWorkerInfoList()) {
-      sendAggregatorCache.addCountAggregator(worker.getTaskId());
+      sendGlobalCommCache.addSpecialCount(worker.getTaskId());
       flushAggregatorsToWorker(worker);
       progressable.progress();
     }
-    sendAggregatorCache.reset();
+    sendGlobalCommCache.reset();
   }
 
   /**
@@ -106,10 +104,10 @@ public class NettyMasterClient implements MasterClient {
    * @param worker Worker which we want to send aggregators to
    */
   private void flushAggregatorsToWorker(WorkerInfo worker) {
-    byte[] aggregatorData =
-        sendAggregatorCache.removeAggregators(worker.getTaskId());
+    byte[] data =
+        sendGlobalCommCache.removeSerialized(worker.getTaskId());
     nettyClient.sendWritableRequest(
-        worker.getTaskId(), new SendAggregatorsToOwnerRequest(aggregatorData,
+        worker.getTaskId(), new SendAggregatorsToOwnerRequest(data,
           service.getMasterInfo().getTaskId()));
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
index 1c05910..60566f9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
@@ -18,14 +18,14 @@
 
 package org.apache.giraph.comm.netty;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import java.net.InetSocketAddress;
+
 import org.apache.giraph.bsp.CentralizedServiceMaster;
-import org.apache.giraph.comm.netty.handler.MasterRequestServerHandler;
 import org.apache.giraph.comm.MasterServer;
+import org.apache.giraph.comm.netty.handler.MasterRequestServerHandler;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.hadoop.util.Progressable;
 
-import java.net.InetSocketAddress;
-
 /**
  * Netty implementation of {@link MasterServer}
  */
@@ -46,7 +46,7 @@ public class NettyMasterServer implements MasterServer {
       Progressable progressable,
       Thread.UncaughtExceptionHandler exceptionHandler) {
     nettyServer = new NettyServer(conf,
-        new MasterRequestServerHandler.Factory(service.getAggregatorHandler()),
+        new MasterRequestServerHandler.Factory(service.getGlobalCommHandler()),
         service.getMasterInfo(), progressable, exceptionHandler);
     nettyServer.start();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
index 8b5f293..3096c6e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
@@ -18,21 +18,22 @@
 
 package org.apache.giraph.comm.netty;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import java.io.IOException;
+
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.WorkerClient;
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
-import org.apache.giraph.comm.aggregators.SendAggregatedValueCache;
-import org.apache.giraph.comm.requests.SendAggregatorsToMasterRequest;
+import org.apache.giraph.comm.aggregators.SendGlobalCommCache;
+import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
 import org.apache.giraph.comm.requests.SendAggregatorsToWorkerRequest;
+import org.apache.giraph.comm.requests.SendReducedToMasterRequest;
 import org.apache.giraph.comm.requests.SendWorkerAggregatorsRequest;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 
-import java.io.IOException;
-
 /**
  * Netty implementation of {@link WorkerAggregatorRequestProcessor}
  */
@@ -45,8 +46,8 @@ public class NettyWorkerAggregatorRequestProcessor
   /** Service worker */
   private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
   /** Cached map of partition ids to serialized aggregator data */
-  private final SendAggregatedValueCache sendAggregatedValueCache =
-      new SendAggregatedValueCache();
+  private final SendGlobalCommCache sendReducedValuesCache =
+      new SendGlobalCommCache(false);
   /** How big a single aggregator request can be */
   private final int maxBytesPerAggregatorRequest;
 
@@ -71,16 +72,16 @@ public class NettyWorkerAggregatorRequestProcessor
   }
 
   @Override
-  public boolean sendAggregatedValue(String aggregatorName,
-      Writable aggregatedValue) throws IOException {
+  public boolean sendReducedValue(String name,
+      Writable reducedValue) throws IOException {
     WorkerInfo owner =
-        AggregatorUtils.getOwner(aggregatorName,
+        AggregatorUtils.getOwner(name,
             serviceWorker.getWorkerInfoList());
     if (isThisWorker(owner)) {
       return false;
     } else {
-      int currentSize = sendAggregatedValueCache.addAggregator(
-          owner.getTaskId(), aggregatorName, aggregatedValue);
+      int currentSize = sendReducedValuesCache.addValue(owner.getTaskId(),
+          name, GlobalCommType.REDUCED_VALUE, reducedValue);
       if (currentSize >= maxBytesPerAggregatorRequest) {
         flushAggregatorsToWorker(owner);
       }
@@ -92,12 +93,12 @@ public class NettyWorkerAggregatorRequestProcessor
   public void flush() throws IOException {
     for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
       if (!isThisWorker(workerInfo)) {
-        sendAggregatedValueCache.addCountAggregator(workerInfo.getTaskId());
+        sendReducedValuesCache.addSpecialCount(workerInfo.getTaskId());
         flushAggregatorsToWorker(workerInfo);
         progressable.progress();
       }
     }
-    sendAggregatedValueCache.reset();
+    sendReducedValuesCache.reset();
   }
 
   /**
@@ -106,22 +107,21 @@ public class NettyWorkerAggregatorRequestProcessor
    * @param worker Worker which we want to send aggregators to
    */
   private void flushAggregatorsToWorker(WorkerInfo worker) {
-    byte[] aggregatorData =
-        sendAggregatedValueCache.removeAggregators(worker.getTaskId());
+    byte[] data =
+        sendReducedValuesCache.removeSerialized(worker.getTaskId());
     workerClient.sendWritableRequest(worker.getTaskId(),
-        new SendWorkerAggregatorsRequest(aggregatorData,
+        new SendWorkerAggregatorsRequest(data,
             serviceWorker.getWorkerInfo().getTaskId()));
   }
 
   @Override
-  public void sendAggregatedValuesToMaster(
-      byte[] aggregatorData) throws IOException {
+  public void sendReducedValuesToMaster(byte[] data) throws IOException {
     workerClient.sendWritableRequest(serviceWorker.getMasterInfo().getTaskId(),
-        new SendAggregatorsToMasterRequest(aggregatorData));
+        new SendReducedToMasterRequest(data));
   }
 
   @Override
-  public void distributeAggregators(
+  public void distributeReducedValues(
       Iterable<byte[]> aggregatorDataList) throws IOException {
     for (byte[] aggregatorData : aggregatorDataList) {
       for (WorkerInfo worker : serviceWorker.getWorkerInfoList()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
index e043314..02c72f7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
@@ -18,10 +18,10 @@
 
 package org.apache.giraph.comm.netty.handler;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.requests.MasterRequest;
-import org.apache.giraph.master.MasterAggregatorHandler;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.TaskInfo;
+import org.apache.giraph.master.MasterAggregatorHandler;
 
 /** Handler for requests on master */
 public class MasterRequestServerHandler extends

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index c7561ee..26eaa8c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -54,7 +54,7 @@ public enum RequestType {
   /** Send aggregated values from one worker's vertices */
   SEND_WORKER_AGGREGATORS_REQUEST(SendWorkerAggregatorsRequest.class),
   /** Send aggregated values from worker owner to master */
-  SEND_AGGREGATORS_TO_MASTER_REQUEST(SendAggregatorsToMasterRequest.class),
+  SEND_AGGREGATORS_TO_MASTER_REQUEST(SendReducedToMasterRequest.class),
   /** Send aggregators from master to worker owners */
   SEND_AGGREGATORS_TO_OWNER_REQUEST(SendAggregatorsToOwnerRequest.class),
   /** Send aggregators from worker owner to other workers */

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java
deleted file mode 100644
index 2a05192..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import org.apache.giraph.master.MasterAggregatorHandler;
-
-import java.io.IOException;
-
-/**
- * Request to send final aggregated values from worker which owns
- * aggregators to the master
- */
-public class SendAggregatorsToMasterRequest extends ByteArrayRequest
-    implements MasterRequest {
-
-  /**
-   * Constructor
-   *
-   * @param data Serialized aggregator data
-   */
-  public SendAggregatorsToMasterRequest(byte[] data) {
-    super(data);
-  }
-
-  /**
-   * Constructor used for reflection only
-   */
-  public SendAggregatorsToMasterRequest() {
-  }
-
-  @Override
-  public void doRequest(MasterAggregatorHandler aggregatorHandler) {
-    try {
-      aggregatorHandler.acceptAggregatedValues(getDataInput());
-    } catch (IOException e) {
-      throw new IllegalStateException("doRequest: " +
-          "IOException occurred while processing request", e);
-    }
-  }
-
-  @Override
-  public RequestType getType() {
-    return RequestType.SEND_AGGREGATORS_TO_MASTER_REQUEST;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
index 10d8d02..2d5cc51 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
@@ -21,11 +21,12 @@ package org.apache.giraph.comm.requests;
 import java.io.DataInput;
 import java.io.IOException;
 
-import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
-import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
+import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -55,28 +56,32 @@ public class SendAggregatorsToOwnerRequest
 
   @Override
   public void doRequest(ServerData serverData) {
+    UnsafeByteArrayOutputStream reusedOut = new UnsafeByteArrayOutputStream();
+    UnsafeReusableByteArrayInput reusedIn = new UnsafeReusableByteArrayInput();
+
     DataInput input = getDataInput();
     AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
     try {
-      int numAggregators = input.readInt();
-      for (int i = 0; i < numAggregators; i++) {
-        String aggregatorName = input.readUTF();
-        WritableFactory<Aggregator<Writable>> aggregatorFactory =
-            WritableUtils.readWritableObject(input, conf);
-        if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
-          LongWritable count = new LongWritable(0);
-          count.readFields(input);
-          aggregatorData.receivedRequestCountFromMaster(count.get(),
+      int num = input.readInt();
+      for (int i = 0; i < num; i++) {
+        String name = input.readUTF();
+        GlobalCommType type = GlobalCommType.values()[input.readByte()];
+        Writable value = WritableUtils.readWritableObject(input, conf);
+        if (type == GlobalCommType.SPECIAL_COUNT) {
+          aggregatorData.receivedRequestCountFromMaster(
+              ((LongWritable) value).get(),
               getSenderTaskId());
         } else {
-          aggregatorData.registerAggregatorClass(aggregatorName,
-              aggregatorFactory);
-          Writable aggregatorValue =
-              aggregatorData.createAggregatorInitialValue(aggregatorName);
-          aggregatorValue.readFields(input);
-          aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue);
-          serverData.getOwnerAggregatorData().registerAggregator(
-              aggregatorName, aggregatorFactory);
+          aggregatorData.receiveValueFromMaster(name, type, value);
+
+          if (type == GlobalCommType.REDUCE_OPERATIONS) {
+            ReduceOperation<Object, Writable> reduceOpCopy =
+                (ReduceOperation<Object, Writable>)
+                WritableUtils.createCopy(reusedOut, reusedIn, value);
+
+            serverData.getOwnerAggregatorData().registerReducer(
+                name, reduceOpCopy);
+          }
         }
       }
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
index d469e96..361bdc9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
@@ -21,11 +21,9 @@ package org.apache.giraph.comm.requests;
 import java.io.DataInput;
 import java.io.IOException;
 
-import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.comm.GlobalCommType;
 import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
-import org.apache.giraph.utils.WritableFactory;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -58,23 +56,17 @@ public class SendAggregatorsToWorkerRequest extends
     DataInput input = getDataInput();
     AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
     try {
-      int numAggregators = input.readInt();
-      for (int i = 0; i < numAggregators; i++) {
-        String aggregatorName = input.readUTF();
-        WritableFactory<Aggregator<Writable>> aggregatorFactory =
-            WritableUtils.readWritableObject(input, conf);
-        if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
-          LongWritable count = new LongWritable(0);
-          count.readFields(input);
-          aggregatorData.receivedRequestCountFromWorker(count.get(),
+      int num = input.readInt();
+      for (int i = 0; i < num; i++) {
+        String name = input.readUTF();
+        GlobalCommType type = GlobalCommType.values()[input.readByte()];
+        Writable value = WritableUtils.readWritableObject(input, conf);
+        if (type == GlobalCommType.SPECIAL_COUNT) {
+          aggregatorData.receivedRequestCountFromWorker(
+              ((LongWritable) value).get(),
               getSenderTaskId());
         } else {
-          aggregatorData.registerAggregatorClass(aggregatorName,
-              aggregatorFactory);
-          Writable aggregatorValue =
-              aggregatorData.createAggregatorInitialValue(aggregatorName);
-          aggregatorValue.readFields(input);
-          aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue);
+          aggregatorData.receiveValueFromMaster(name, type, value);
         }
       }
     } catch (IOException e) {


Mime
View raw message