giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject git commit: updated refs/heads/trunk to ce97134
Date Fri, 01 Aug 2014 15:20:53 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk 930352220 -> ce97134d2


GIRAPH-934: Allow having state in aggregators (ikabiljo via majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/ce97134d
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/ce97134d
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/ce97134d

Branch: refs/heads/trunk
Commit: ce97134d253c4e9fca48b7cede2048e60f36ff79
Parents: 9303522
Author: Maja Kabiljo <majakabiljo@fb.com>
Authored: Fri Aug 1 08:20:10 2014 -0700
Committer: Maja Kabiljo <majakabiljo@fb.com>
Committed: Fri Aug 1 08:20:10 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../giraph/aggregators/AggregatorWrapper.java   |  22 ++--
 .../aggregators/ArrayAggregatorFactory.java     | 128 +++++++++++++++++++
 .../giraph/aggregators/BasicAggregator.java     |   9 ++
 .../aggregators/ClassAggregatorFactory.java     |  87 +++++++++++++
 .../org/apache/giraph/comm/MasterClient.java    |   9 +-
 .../java/org/apache/giraph/comm/ServerData.java |  14 +-
 .../aggregators/AggregatorOutputStream.java     |  12 +-
 .../comm/aggregators/AggregatorUtils.java       |  39 +-----
 .../aggregators/AllAggregatorServerData.java    |  69 ++++------
 .../aggregators/OwnerAggregatorServerData.java  |  26 ++--
 .../comm/aggregators/SendAggregatorCache.java   |  15 ++-
 .../giraph/comm/netty/NettyMasterClient.java    |  17 +--
 .../requests/SendAggregatorsToOwnerRequest.java |  19 +--
 .../SendAggregatorsToWorkerRequest.java         |  17 +--
 .../giraph/comm/requests/WritableRequest.java   |   2 +-
 .../giraph/master/MasterAggregatorHandler.java  |  66 ++++++----
 .../giraph/master/MasterAggregatorUsage.java    |  16 +++
 .../org/apache/giraph/master/MasterCompute.java |   8 ++
 .../org/apache/giraph/utils/ArrayWritable.java  | 100 +++++++++++++++
 .../apache/giraph/utils/WritableFactory.java    |  28 ++++
 .../org/apache/giraph/utils/WritableUtils.java  |  84 ++++++++++--
 .../giraph/worker/WorkerAggregatorHandler.java  |  52 +++++---
 .../giraph/aggregators/TestArrayAggregator.java |  50 ++++++++
 24 files changed, 686 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 54ed3a3..dbb134a 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-934: Allow having state in aggregators (ikabiljo via majakabiljo)
+
   GIRAPH-932: Adding .arcconfig to GIRAPH for Arcanist support (aching)
 
   GIRAPH-927: Decouple netty server threads from message processing (edunov via pavanka)  

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
index 7150402..fa18a64 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java
@@ -19,7 +19,7 @@
 package org.apache.giraph.aggregators;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -34,21 +34,25 @@ public class AggregatorWrapper<A extends Writable> {
   private final boolean persistent;
   /** Value aggregated in previous super step */
   private A previousAggregatedValue;
+  /** Aggregator factory */
+  private final WritableFactory<? extends Aggregator<A>> aggregatorFactory;
   /** Aggregator for next super step */
   private final Aggregator<A> currentAggregator;
   /** Whether anyone changed current value since the moment it was reset */
   private boolean changed;
 
   /**
-   * @param aggregatorClass Class type of the aggregator
-   * @param persistent      False iff aggregator should be reset at the end of
-   *                        each super step
-   * @param conf            Configuration
+   * @param aggregatorFactory Aggregator Factory
+   * @param persistent        False iff aggregator should be reset at the end
+   *                          of each super step
+   * @param conf              Configuration
    */
-  public AggregatorWrapper(Class<? extends Aggregator<A>> aggregatorClass,
+  public AggregatorWrapper(
+      WritableFactory<? extends Aggregator<A>> aggregatorFactory,
       boolean persistent, ImmutableClassesGiraphConfiguration conf) {
     this.persistent = persistent;
-    currentAggregator = ReflectionUtils.newInstance(aggregatorClass, conf);
+    this.aggregatorFactory = aggregatorFactory;
+    currentAggregator = aggregatorFactory.create();
     changed = false;
     previousAggregatedValue = currentAggregator.createInitialValue();
   }
@@ -140,7 +144,7 @@ public class AggregatorWrapper<A extends Writable> {
    *
    * @return Aggregator class
    */
-  public Class<? extends Aggregator> getAggregatorClass() {
-    return currentAggregator.getClass();
+  public WritableFactory<? extends Aggregator<A>> getAggregatorFactory() {
+    return aggregatorFactory;
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java
new file mode 100644
index 0000000..c977c57
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.aggregators;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Array;
+
+import org.apache.giraph.utils.ArrayWritable;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Generic array aggregator factory, used to aggregate elements
+ * of ArrayWritable via passed element aggregator.
+ *
+ * @param <A> Type of individual element
+ */
+public class ArrayAggregatorFactory<A extends Writable>
+    implements WritableFactory<Aggregator<ArrayWritable<A>>> {
+  /** number of elements in array */
+  private int n;
+  /** element aggregator class */
+  private WritableFactory<? extends Aggregator<A>> elementAggregatorFactory;
+
+  /**
+   * Constructor
+   * @param n Number of elements in array
+   * @param elementAggregatorClass Type of element aggregator
+   */
+  public ArrayAggregatorFactory(
+      int n, Class<? extends Aggregator<A>> elementAggregatorClass) {
+    this(n, new ClassAggregatorFactory<>(elementAggregatorClass));
+  }
+
+  /**
+   * Constructor
+   * @param n Number of elements in array
+   * @param elementAggregatorFactory Element aggregator factory
+   */
+  public ArrayAggregatorFactory(int n,
+      WritableFactory<? extends Aggregator<A>> elementAggregatorFactory) {
+    this.n = n;
+    this.elementAggregatorFactory = elementAggregatorFactory;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    n = in.readInt();
+    elementAggregatorFactory = WritableUtils.readWritableObject(in, null);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(n);
+    WritableUtils.writeWritableObject(elementAggregatorFactory, out);
+  }
+
+  @Override
+  public Aggregator<ArrayWritable<A>> create() {
+    return new ArrayAggregator<>(
+        n, elementAggregatorFactory.create());
+  }
+
+  /**
+   * Stateful aggregator that aggregates ArrayWritable by
+   * aggregating individual elements
+   *
+   * @param <A> Type of individual element
+   */
+  public static class ArrayAggregator<A extends Writable>
+      extends BasicAggregator<ArrayWritable<A>> {
+    /** number of elements in array */
+    private final int n;
+    /** element aggregator */
+    private final Aggregator<A> elementAggregator;
+
+    /**
+     * Constructor
+     * @param n Number of elements in array
+     * @param elementAggregator Element aggregator
+     */
+    public ArrayAggregator(int n, Aggregator<A> elementAggregator) {
+      super(null);
+      this.n = n;
+      this.elementAggregator = elementAggregator;
+      reset();
+    }
+
+    @Override
+    public void aggregate(ArrayWritable<A> other) {
+      A[] array = getAggregatedValue().get();
+      for (int i = 0; i < n; i++) {
+        elementAggregator.setAggregatedValue(array[i]);
+        elementAggregator.aggregate(other.get()[i]);
+        array[i] = elementAggregator.getAggregatedValue();
+      }
+    }
+
+    @Override
+    public ArrayWritable<A> createInitialValue() {
+      Class<A> elementClass =
+          (Class) elementAggregator.createInitialValue().getClass();
+      A[] array = (A[]) Array.newInstance(elementClass, n);
+      for (int i = 0; i < n; i++) {
+        array[i] = elementAggregator.createInitialValue();
+      }
+      return new ArrayWritable<>(elementClass, array);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java
index 07a4100..c351846 100644
--- a/giraph-core/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java
@@ -40,6 +40,15 @@ public abstract class BasicAggregator<A extends Writable> implements
     value = createInitialValue();
   }
 
+  /**
+   * Constructor
+   * @param initialValue initial value
+   */
+  public BasicAggregator(A initialValue) {
+    value = initialValue;
+  }
+
+
   @Override
   public A getAggregatedValue() {
     return value;

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
new file mode 100644
index 0000000..944656e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.aggregators;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Aggregator factory based on aggregatorClass.
+ *
+ * @param <T> Aggregated value type
+ */
+public class ClassAggregatorFactory<T extends Writable>
+    extends DefaultImmutableClassesGiraphConfigurable
+    implements WritableFactory<Aggregator<T>> {
+  /** Aggregator class */
+  private Class<? extends Aggregator<T>> aggregatorClass;
+
+  /** Constructor */
+  public ClassAggregatorFactory() {
+  }
+
+  /**
+   * Constructor
+   * @param aggregatorClass Aggregator class
+   */
+  public ClassAggregatorFactory(
+      Class<? extends Aggregator<T>> aggregatorClass) {
+    this(aggregatorClass, null);
+
+  }
+
+  /**
+   * Constructor
+   * @param aggregatorClass Aggregator class
+   * @param conf Configuration
+   */
+  public ClassAggregatorFactory(Class<? extends Aggregator<T>> aggregatorClass,
+      ImmutableClassesGiraphConfiguration conf) {
+    Preconditions.checkNotNull(aggregatorClass,
+        "aggregatorClass cannot be null in ClassAggregatorFactory");
+    this.aggregatorClass = aggregatorClass;
+    setConf(conf);
+  }
+
+  @Override
+  public Aggregator<T> create() {
+    return ReflectionUtils.newInstance(aggregatorClass, getConf());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    aggregatorClass = WritableUtils.readClass(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Preconditions.checkNotNull(aggregatorClass,
+        "aggregatorClass cannot be null in ClassAggregatorFactory");
+    WritableUtils.writeClass(aggregatorClass, out);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
index 793d059..b7718a7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java
@@ -18,11 +18,12 @@
 
 package org.apache.giraph.comm;
 
+import java.io.IOException;
+
 import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.Writable;
 
-import java.io.IOException;
-
 /**
  * Interface for master to send messages to workers
  */
@@ -36,12 +37,12 @@ public interface MasterClient {
    * Sends aggregator to its owner
    *
    * @param aggregatorName Name of the aggregator
-   * @param aggregatorClass Class of the aggregator
+   * @param aggregatorFactory Aggregator factory
    * @param aggregatedValue Value of the aggregator
    * @throws IOException
    */
   void sendAggregator(String aggregatorName,
-      Class<? extends Aggregator> aggregatorClass,
+      WritableFactory<? extends Aggregator> aggregatorFactory,
       Writable aggregatedValue) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index 29488fc..a92cd1c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -18,6 +18,12 @@
 
 package org.apache.giraph.comm;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
@@ -36,12 +42,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * Anything that the server stores
  *
@@ -123,7 +123,7 @@ public class ServerData<I extends WritableComparable,
     EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory();
     edgeStoreFactory.initialize(service, conf, context);
     edgeStore = edgeStoreFactory.newStore();
-    ownerAggregatorData = new OwnerAggregatorServerData(context, conf);
+    ownerAggregatorData = new OwnerAggregatorServerData(context);
     allAggregatorData = new AllAggregatorServerData(context, conf);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
index 627b4cc..79bc08a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
@@ -18,11 +18,13 @@
 
 package org.apache.giraph.comm.aggregators;
 
+import java.io.IOException;
+
 import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 
-import java.io.IOException;
-
 /**
  * Implementation of {@link CountingOutputStream} which allows writing of
  * aggregators in the form of triple (name, classname, value)
@@ -32,17 +34,17 @@ public class AggregatorOutputStream extends CountingOutputStream {
    * Write aggregator to the stream and increment internal counter
    *
    * @param aggregatorName Name of the aggregator
-   * @param aggregatorClass Class of aggregator
+   * @param aggregatorFactory Aggregator factory
    * @param aggregatedValue Value of aggregator
    * @return Number of bytes occupied by the stream
    * @throws IOException
    */
   public int addAggregator(String aggregatorName,
-      Class<? extends Aggregator> aggregatorClass,
+      WritableFactory<? extends Aggregator> aggregatorFactory,
       Writable aggregatedValue) throws IOException {
     incrementCounter();
     dataOutput.writeUTF(aggregatorName);
-    dataOutput.writeUTF(aggregatorClass.getName());
+    WritableUtils.writeWritableObject(aggregatorFactory, dataOutput);
     aggregatedValue.write(dataOutput);
     return getSize();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
index ceb30a8..a94ab38 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
@@ -18,13 +18,10 @@
 
 package org.apache.giraph.comm.aggregators;
 
+import java.util.List;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-
-import java.util.List;
 
 /**
  * Class for aggregator constants and utility methods
@@ -36,6 +33,7 @@ public class AggregatorUtils {
    */
   public static final String SPECIAL_COUNT_AGGREGATOR =
       "__aggregatorRequestCount";
+
   /** How big a single aggregator request can be (in bytes) */
   public static final String MAX_BYTES_PER_AGGREGATOR_REQUEST =
       "giraph.maxBytesPerAggregatorRequest";
@@ -58,37 +56,6 @@ public class AggregatorUtils {
   private AggregatorUtils() { }
 
   /**
-   * Get aggregator class from class name, catch all exceptions.
-   *
-   * @param aggregatorClassName Class nam of aggregator class
-   * @return Aggregator class
-   */
-  public static Class<Aggregator<Writable>> getAggregatorClass(String
-      aggregatorClassName) {
-    try {
-      return (Class<Aggregator<Writable>>) Class.forName(aggregatorClassName);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalStateException("getAggregatorClass: " +
-          "ClassNotFoundException for aggregator class " + aggregatorClassName,
-          e);
-    }
-  }
-
-  /**
-   * Create new aggregator instance from aggregator class,
-   * catch all exceptions.
-   *
-   * @param aggregatorClass Class of aggregator
-   * @param conf Configuration
-   * @return New aggregator
-   */
-  public static Aggregator<Writable> newAggregatorInstance(
-      Class<Aggregator<Writable>> aggregatorClass,
-      ImmutableClassesGiraphConfiguration conf) {
-    return ReflectionUtils.newInstance(aggregatorClass, conf);
-  }
-
-  /**
    * Get owner of aggregator with selected name from the list of workers
    *
    * @param aggregatorName Name of the aggregators

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
index 177e738..effc9bf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
@@ -18,10 +18,18 @@
 
 package org.apache.giraph.comm.aggregators;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
 import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.master.MasterInfo;
+import org.apache.giraph.utils.Factory;
 import org.apache.giraph.utils.TaskIdsPermitsBarrier;
+import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
@@ -29,12 +37,6 @@ import org.apache.log4j.Logger;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-
 /**
  * Accepts aggregators and their values from previous superstep from master
  * and workers which own aggregators. Keeps data received from master so it
@@ -49,16 +51,9 @@ public class AllAggregatorServerData {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(AllAggregatorServerData.class);
-  /**
-   * Map from aggregator class to aggregator object which we need in order
-   * to create initial aggregated values
-   */
-  private final
-  ConcurrentMap<Class<Aggregator<Writable>>, Aggregator<Writable>>
-  aggregatorTypesMap = Maps.newConcurrentMap();
-  /** Map of aggregator classes */
-  private final ConcurrentMap<String, Class<Aggregator<Writable>>>
-  aggregatorClassMap = Maps.newConcurrentMap();
+  /** Map of aggregator factories */
+  private final ConcurrentMap<String, WritableFactory<Aggregator<Writable>>>
+  aggregatorFactoriesMap = Maps.newConcurrentMap();
   /** Map of values of aggregators from previous superstep */
   private final ConcurrentMap<String, Writable>
   aggregatedValuesMap = Maps.newConcurrentMap();
@@ -104,16 +99,12 @@ public class AllAggregatorServerData {
   /**
    * Register the class of the aggregator, received by master or worker.
    *
-   * @param name            Aggregator name
-   * @param aggregatorClass Class of the aggregator
+   * @param name              Aggregator name
+   * @param aggregatorFactory Aggregator factory
    */
   public void registerAggregatorClass(String name,
-      Class<Aggregator<Writable>> aggregatorClass) {
-    aggregatorClassMap.put(name, aggregatorClass);
-    if (!aggregatorTypesMap.containsKey(aggregatorClass)) {
-      aggregatorTypesMap.putIfAbsent(aggregatorClass,
-          AggregatorUtils.newAggregatorInstance(aggregatorClass, conf));
-    }
+      WritableFactory<Aggregator<Writable>> aggregatorFactory) {
+    aggregatorFactoriesMap.put(name, aggregatorFactory);
     progressable.progress();
   }
 
@@ -139,10 +130,10 @@ public class AllAggregatorServerData {
    * @return Empty aggregated value for this aggregator
    */
   public Writable createAggregatorInitialValue(String name) {
-    Class<Aggregator<Writable>> aggregatorClass = aggregatorClassMap.get(name);
-    Aggregator<Writable> aggregator = aggregatorTypesMap.get(aggregatorClass);
-    synchronized (aggregator) {
-      return aggregator.createInitialValue();
+    WritableFactory<Aggregator<Writable>> aggregatorFactory =
+        aggregatorFactoriesMap.get(name);
+    synchronized (aggregatorFactory) {
+      return aggregatorFactory.create().createInitialValue();
     }
   }
 
@@ -211,29 +202,25 @@ public class AllAggregatorServerData {
    * @param workerIds All workers in the job apart from the current one
    * @param previousAggregatedValuesMap Map of values from previous
    *                                    superstep to fill out
-   * @param currentAggregatorMap Map of aggregators for current superstep to
-   *                             fill out. All aggregators in this map will
-   *                             be set to initial value.
+   * @param currentAggregatorFactoryMap Map of aggregators factories for
+   *                                    current superstep to fill out.
    */
   public void fillNextSuperstepMapsWhenReady(
       Set<Integer> workerIds,
       Map<String, Writable> previousAggregatedValuesMap,
-      Map<String, Aggregator<Writable>> currentAggregatorMap) {
+      Map<String, Factory<Aggregator<Writable>>> currentAggregatorFactoryMap) {
     workersBarrier.waitForRequiredPermits(workerIds);
     if (LOG.isDebugEnabled()) {
       LOG.debug("fillNextSuperstepMapsWhenReady: Aggregators ready");
     }
     previousAggregatedValuesMap.clear();
     previousAggregatedValuesMap.putAll(aggregatedValuesMap);
-    for (Map.Entry<String, Class<Aggregator<Writable>>> entry :
-        aggregatorClassMap.entrySet()) {
-      Aggregator<Writable> aggregator =
-          currentAggregatorMap.get(entry.getKey());
-      if (aggregator == null) {
-        currentAggregatorMap.put(entry.getKey(),
-            AggregatorUtils.newAggregatorInstance(entry.getValue(), conf));
-      } else {
-        aggregator.reset();
+    for (Map.Entry<String, WritableFactory<Aggregator<Writable>>> entry :
+        aggregatorFactoriesMap.entrySet()) {
+      Factory<Aggregator<Writable>> aggregatorFactory =
+          currentAggregatorFactoryMap.get(entry.getKey());
+      if (aggregatorFactory == null) {
+        currentAggregatorFactoryMap.put(entry.getKey(), entry.getValue());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
index eb25a2e..2f3d5e5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
@@ -18,9 +18,14 @@
 
 package org.apache.giraph.comm.aggregators;
 
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
 import org.apache.giraph.aggregators.Aggregator;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.TaskIdsPermitsBarrier;
+import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
@@ -29,11 +34,6 @@ import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 
-import java.util.AbstractMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-
 /**
  * Class for holding aggregators which current worker owns,
  * and aggregating partial aggregator values from workers.
@@ -73,19 +73,14 @@ public class OwnerAggregatorServerData {
   private final TaskIdsPermitsBarrier workersBarrier;
   /** Progressable used to report progress */
   private final Progressable progressable;
-  /** Configuration */
-  private final ImmutableClassesGiraphConfiguration conf;
 
   /**
    * Constructor
    *
    * @param progressable Progressable used to report progress
-   * @param conf         Configuration
    */
-  public OwnerAggregatorServerData(Progressable progressable,
-      ImmutableClassesGiraphConfiguration conf) {
+  public OwnerAggregatorServerData(Progressable progressable) {
     this.progressable = progressable;
-    this.conf = conf;
     workersBarrier = new TaskIdsPermitsBarrier(progressable);
   }
 
@@ -93,15 +88,14 @@ public class OwnerAggregatorServerData {
    * Register an aggregator which current worker owns. Thread-safe.
    *
    * @param name Name of aggregator
-   * @param aggregatorClass Aggregator class
+   * @param aggregatorFactory Aggregator factory
    */
   public void registerAggregator(String name,
-      Class<Aggregator<Writable>> aggregatorClass) {
+      WritableFactory<Aggregator<Writable>> aggregatorFactory) {
     if (LOG.isDebugEnabled() && myAggregatorMap.isEmpty()) {
       LOG.debug("registerAggregator: The first registration after a reset()");
     }
-    myAggregatorMap.putIfAbsent(name,
-        AggregatorUtils.newAggregatorInstance(aggregatorClass, conf));
+    myAggregatorMap.putIfAbsent(name, aggregatorFactory.create());
     progressable.progress();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
index adc2aa8..8f880b4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
@@ -18,15 +18,16 @@
 
 package org.apache.giraph.comm.aggregators;
 
+import java.io.IOException;
+import java.util.Map;
+
 import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 
 import com.google.common.collect.Maps;
 
-import java.io.IOException;
-import java.util.Map;
-
 /**
  * Takes and serializes aggregators and keeps them grouped by owner
  * partition id, to be sent in bulk.
@@ -41,20 +42,20 @@ public class SendAggregatorCache extends CountingCache {
    *
    * @param taskId Task id of worker which owns the aggregator
    * @param aggregatorName Name of the aggregator
-   * @param aggregatorClass Class of the aggregator
+   * @param aggregatorFactory Aggregator factory
    * @param aggregatedValue Value of the aggregator
    * @return Number of bytes in serialized data for this worker
    * @throws IOException
    */
   public int addAggregator(Integer taskId, String aggregatorName,
-      Class<? extends Aggregator> aggregatorClass,
+      WritableFactory<? extends Aggregator> aggregatorFactory,
       Writable aggregatedValue) throws IOException {
     AggregatorOutputStream out = aggregatorMap.get(taskId);
     if (out == null) {
       out = new AggregatorOutputStream();
       aggregatorMap.put(taskId, out);
     }
-    return out.addAggregator(aggregatorName, aggregatorClass,
+    return out.addAggregator(aggregatorName, aggregatorFactory,
         aggregatedValue);
   }
 
@@ -86,6 +87,6 @@ public class SendAggregatorCache extends CountingCache {
     // current number of requests, plus one for the last flush
     long totalCount = getCount(taskId) + 1;
     addAggregator(taskId, AggregatorUtils.SPECIAL_COUNT_AGGREGATOR,
-        Aggregator.class, new LongWritable(totalCount));
+        null, new LongWritable(totalCount));
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
index 1218d29..51277c9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
@@ -18,20 +18,21 @@
 
 package org.apache.giraph.comm.netty;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import java.io.IOException;
+
+import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.comm.aggregators.SendAggregatorCache;
 import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest;
-import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.WritableFactory;
 import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.util.Progressable;
 
-import java.io.IOException;
-
 /**
  * Netty implementation of {@link MasterClient}
  */
@@ -39,7 +40,7 @@ public class NettyMasterClient implements MasterClient {
   /** Netty client that does the actual I/O */
   private final NettyClient nettyClient;
   /** Worker information for current superstep */
-  private CentralizedServiceMaster<?, ?, ?> service;
+  private final CentralizedServiceMaster<?, ?, ?> service;
   /** Cached map of partition ids to serialized aggregator data */
   private final SendAggregatorCache sendAggregatorCache =
       new SendAggregatorCache();
@@ -78,12 +79,12 @@ public class NettyMasterClient implements MasterClient {
 
   @Override
   public void sendAggregator(String aggregatorName,
-      Class<? extends Aggregator> aggregatorClass,
+      WritableFactory<? extends Aggregator> aggregatorFactory,
       Writable aggregatedValue) throws IOException {
     WorkerInfo owner =
         AggregatorUtils.getOwner(aggregatorName, service.getWorkerInfoList());
     int currentSize = sendAggregatorCache.addAggregator(owner.getTaskId(),
-        aggregatorName, aggregatorClass, aggregatedValue);
+        aggregatorName, aggregatorFactory, aggregatedValue);
     if (currentSize >= maxBytesPerAggregatorRequest) {
       flushAggregatorsToWorker(owner);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
index e2681ee..10d8d02 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
@@ -18,16 +18,18 @@
 
 package org.apache.giraph.comm.requests;
 
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
-import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 
-import java.io.DataInput;
-import java.io.IOException;
-
 /**
  * Request to send final aggregatd values from master to worker which owns
  * the aggregators
@@ -59,23 +61,22 @@ public class SendAggregatorsToOwnerRequest
       int numAggregators = input.readInt();
       for (int i = 0; i < numAggregators; i++) {
         String aggregatorName = input.readUTF();
-        String aggregatorClassName = input.readUTF();
+        WritableFactory<Aggregator<Writable>> aggregatorFactory =
+            WritableUtils.readWritableObject(input, conf);
         if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
           LongWritable count = new LongWritable(0);
           count.readFields(input);
           aggregatorData.receivedRequestCountFromMaster(count.get(),
               getSenderTaskId());
         } else {
-          Class<Aggregator<Writable>> aggregatorClass =
-              AggregatorUtils.getAggregatorClass(aggregatorClassName);
           aggregatorData.registerAggregatorClass(aggregatorName,
-              aggregatorClass);
+              aggregatorFactory);
           Writable aggregatorValue =
               aggregatorData.createAggregatorInitialValue(aggregatorName);
           aggregatorValue.readFields(input);
           aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue);
           serverData.getOwnerAggregatorData().registerAggregator(
-              aggregatorName, aggregatorClass);
+              aggregatorName, aggregatorFactory);
         }
       }
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
index 52e4cba..d469e96 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
@@ -18,16 +18,18 @@
 
 package org.apache.giraph.comm.requests;
 
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
-import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 
-import java.io.DataInput;
-import java.io.IOException;
-
 /**
  * Request to send final aggregated values from worker which owns them to
  * other workers
@@ -59,17 +61,16 @@ public class SendAggregatorsToWorkerRequest extends
       int numAggregators = input.readInt();
       for (int i = 0; i < numAggregators; i++) {
         String aggregatorName = input.readUTF();
-        String aggregatorClassName = input.readUTF();
+        WritableFactory<Aggregator<Writable>> aggregatorFactory =
+            WritableUtils.readWritableObject(input, conf);
         if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
           LongWritable count = new LongWritable(0);
           count.readFields(input);
           aggregatorData.receivedRequestCountFromWorker(count.get(),
               getSenderTaskId());
         } else {
-          Class<Aggregator<Writable>> aggregatorClass =
-              AggregatorUtils.getAggregatorClass(aggregatorClassName);
           aggregatorData.registerAggregatorClass(aggregatorName,
-              aggregatorClass);
+              aggregatorFactory);
           Writable aggregatorValue =
               aggregatorData.createAggregatorInitialValue(aggregatorName);
           aggregatorValue.readFields(input);

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
index 14c8c0d..62ab7f1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java
@@ -44,7 +44,7 @@ public abstract class WritableRequest<I extends WritableComparable,
   public static final int UNKNOWN_SIZE = -1;
 
   /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, V, E> conf;
+  protected ImmutableClassesGiraphConfiguration<I, V, E> conf;
   /** Client id */
   private int clientId = -1;
   /** Request id */

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
index 325d91f..2bc08e9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java
@@ -18,15 +18,24 @@
 
 package org.apache.giraph.master;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.comm.MasterClient;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Map;
+
 import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.aggregators.AggregatorWrapper;
 import org.apache.giraph.aggregators.AggregatorWriter;
+import org.apache.giraph.aggregators.ClassAggregatorFactory;
 import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.bsp.SuperstepState;
+import org.apache.giraph.comm.MasterClient;
+import org.apache.giraph.comm.aggregators.AggregatorUtils;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.MasterLoggingAggregator;
+import org.apache.giraph.utils.WritableFactory;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
@@ -35,12 +44,6 @@ import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.Map;
-
 /** Handler for aggregators on master */
 public class MasterAggregatorHandler implements MasterAggregatorUsage,
     Writable {
@@ -106,7 +109,17 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
       Class<? extends Aggregator<A>> aggregatorClass) throws
       InstantiationException, IllegalAccessException {
     checkAggregatorName(name);
-    return registerAggregator(name, aggregatorClass, false) != null;
+    ClassAggregatorFactory<A> aggregatorFactory =
+        new ClassAggregatorFactory<A>(aggregatorClass, conf);
+    return registerAggregator(name, aggregatorFactory, false) != null;
+  }
+
+  @Override
+  public <A extends Writable> boolean registerAggregator(String name,
+      WritableFactory<? extends Aggregator<A>> aggregator) throws
+      InstantiationException, IllegalAccessException {
+    checkAggregatorName(name);
+    return registerAggregator(name, aggregator, false) != null;
   }
 
   @Override
@@ -114,7 +127,9 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
       Class<? extends Aggregator<A>> aggregatorClass) throws
       InstantiationException, IllegalAccessException {
     checkAggregatorName(name);
-    return registerAggregator(name, aggregatorClass, true) != null;
+    ClassAggregatorFactory<A> aggregatorFactory =
+        new ClassAggregatorFactory<A>(aggregatorClass, conf);
+    return registerAggregator(name, aggregatorFactory, true) != null;
   }
 
   /**
@@ -134,22 +149,22 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
   /**
    * Helper function for registering aggregators.
    *
-   * @param name            Name of the aggregator
-   * @param aggregatorClass Class of the aggregator
-   * @param persistent      Whether aggregator is persistent or not
-   * @param <A>             Aggregated value type
+   * @param name              Name of the aggregator
+   * @param aggregatorFactory Aggregator factory
+   * @param persistent        Whether aggregator is persistent or not
+   * @param <A>               Aggregated value type
    * @return Newly registered aggregator or aggregator which was previously
    *         created with selected name, if any
    */
   private <A extends Writable> AggregatorWrapper<A> registerAggregator
-  (String name, Class<? extends Aggregator<A>> aggregatorClass,
+  (String name, WritableFactory<? extends Aggregator<A>> aggregatorFactory,
       boolean persistent) throws InstantiationException,
       IllegalAccessException {
     AggregatorWrapper<A> aggregatorWrapper =
         (AggregatorWrapper<A>) aggregatorMap.get(name);
     if (aggregatorWrapper == null) {
       aggregatorWrapper =
-          new AggregatorWrapper<A>(aggregatorClass, persistent, conf);
+          new AggregatorWrapper<A>(aggregatorFactory, persistent, conf);
       aggregatorMap.put(name, (AggregatorWrapper<Writable>) aggregatorWrapper);
     }
     return aggregatorWrapper;
@@ -207,7 +222,7 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
       for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
           aggregatorMap.entrySet()) {
         masterClient.sendAggregator(entry.getKey(),
-            entry.getValue().getAggregatorClass(),
+            entry.getValue().getAggregatorFactory(),
             entry.getValue().getPreviousAggregatedValue());
         progressable.progress();
       }
@@ -322,7 +337,7 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
     for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
         aggregatorMap.entrySet()) {
       out.writeUTF(entry.getKey());
-      out.writeUTF(entry.getValue().getAggregatorClass().getName());
+      entry.getValue().getAggregatorFactory().write(out);
       out.writeBoolean(entry.getValue().isPersistent());
       entry.getValue().getPreviousAggregatedValue().write(out);
       progressable.progress();
@@ -336,15 +351,16 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage,
     try {
       for (int i = 0; i < numAggregators; i++) {
         String aggregatorName = in.readUTF();
-        String aggregatorClassName = in.readUTF();
+        WritableFactory<Aggregator<Writable>> aggregatorFactory =
+            WritableUtils.readWritableObject(in, conf);
         boolean isPersistent = in.readBoolean();
-        AggregatorWrapper<Writable> aggregator = registerAggregator(
+        AggregatorWrapper<Writable> aggregatorWrapper = registerAggregator(
             aggregatorName,
-            AggregatorUtils.getAggregatorClass(aggregatorClassName),
+            aggregatorFactory,
             isPersistent);
-        Writable value = aggregator.createInitialValue();
+        Writable value = aggregatorWrapper.createInitialValue();
         value.readFields(in);
-        aggregator.setPreviousAggregatedValue(value);
+        aggregatorWrapper.setPreviousAggregatedValue(value);
         progressable.progress();
       }
     } catch (InstantiationException e) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
index cadae67..91f5d24 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java
@@ -20,6 +20,7 @@ package org.apache.giraph.master;
 
 import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.aggregators.AggregatorUsage;
+import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -40,6 +41,21 @@ public interface MasterAggregatorUsage extends AggregatorUsage {
       InstantiationException, IllegalAccessException;
 
   /**
+   * Register an aggregator in preSuperstep() and/or preApplication(). This
+   * aggregator will have its value reset at the end of each super step.
+   *
+   * Aggregator should either implement Writable, or have no-arg constructor.
+   *
+   * @param name of aggregator
+   * @param aggregatorFactory aggregator factory
+   * @param <A> Aggregator type
+   * @return True iff aggregator wasn't already registered
+   */
+  <A extends Writable> boolean registerAggregator(String name,
+      WritableFactory<? extends Aggregator<A>> aggregatorFactory) throws
+      InstantiationException, IllegalAccessException;
+
+  /**
    * Register persistent aggregator in preSuperstep() and/or
    * preApplication(). This aggregator will not reset value at the end of
    * super step.

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
index d77a9b5..c2a1f9a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java
@@ -23,6 +23,7 @@ import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.utils.WritableFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Mapper;
 
@@ -195,6 +196,13 @@ public abstract class MasterCompute
   }
 
   @Override
+  public final <A extends Writable> boolean registerAggregator(
+    String name, WritableFactory<? extends Aggregator<A>> aggregator)
+    throws InstantiationException, IllegalAccessException {
+    return masterAggregatorUsage.registerAggregator(name, aggregator);
+  }
+
+  @Override
   public final <A extends Writable> boolean registerPersistentAggregator(
       String name,
       Class<? extends Aggregator<A>> aggregatorClass) throws

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/utils/ArrayWritable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ArrayWritable.java b/giraph-core/src/main/java/org/apache/giraph/utils/ArrayWritable.java
new file mode 100644
index 0000000..9ea24c3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ArrayWritable.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Array;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A Writable for arrays containing instances of a class. The elements of this
+ * writable must all be instances of the same class.
+ *
+ * @param <T> element type
+ */
+public class ArrayWritable<T extends Writable> implements Writable {
+  /** Element type class */
+  private Class<T> valueClass;
+  /** Array */
+  private T[] values;
+
+  /** Constructor */
+  public ArrayWritable() {
+  }
+
+  /**
+   * Constructor
+   * @param valueClass Element type class
+   * @param values Array of elements
+   */
+  public ArrayWritable(Class<T> valueClass, T[] values) {
+    Preconditions.checkNotNull(valueClass,
+        "valueClass cannot be null in ArrayWritable");
+    this.valueClass = valueClass;
+    this.values = values;
+  }
+
+  /**
+   * Get element type class
+   * @return element type class
+   */
+  public Class<T> getValueClass() {
+    return valueClass;
+  }
+
+  /**
+   * Set array
+   * @param values array
+   */
+  public void set(T[] values) { this.values = values; }
+
+  /**
+   * Ger array
+   * @return array
+   */
+  public T[] get() { return values; }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    valueClass = WritableUtils.readClass(in);
+    values = (T[]) Array.newInstance(valueClass, in.readInt());
+
+    for (int i = 0; i < values.length; i++) {
+      T value = (T) WritableFactories.newInstance(valueClass);
+      value.readFields(in);                       // read a value
+      values[i] = value;                          // store it in values
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Preconditions.checkNotNull(valueClass,
+        "valueClass cannot be null in ArrayWritable");
+    WritableUtils.writeClass(valueClass, out);
+    out.writeInt(values.length);                 // write values
+    for (int i = 0; i < values.length; i++) {
+      values[i].write(out);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java
new file mode 100644
index 0000000..43bed7e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Factory that can be serialized.
+ * @param <T> Type of object factory creates
+ */
+public interface WritableFactory<T> extends Writable, Factory<T> {
+
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 3f8382e..763f59d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -18,6 +18,18 @@
 
 package org.apache.giraph.utils;
 
+import static org.apache.hadoop.util.ReflectionUtils.newInstance;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.OutEdges;
@@ -33,18 +45,6 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.hadoop.util.ReflectionUtils.newInstance;
-
 /**
  * Helper static methods for working with Writable objects.
  */
@@ -70,6 +70,23 @@ public class WritableUtils {
   }
 
   /**
+   * Instantiate a new Writable, checking for NullWritable along the way.
+   *
+   * @param klass Class
+   * @param configuration Configuration
+   * @param <W> type
+   * @return new instance of class
+   */
+  public static <W extends Writable> W createWritable(
+      Class<W> klass,
+      ImmutableClassesGiraphConfiguration configuration) {
+    W result = createWritable(klass);
+    ConfigurationUtils.configureIfPossible(result, configuration);
+    return result;
+  }
+
+
+  /**
    * Read fields from byteArray to a Writeable object.
    *
    * @param byteArray Byte array to find the fields in.
@@ -616,4 +633,47 @@ public class WritableUtils {
       return null;
     }
   }
+
+  /**
+   * Write object to output stream
+   * @param object Object
+   * @param output Output stream
+   * @throws IOException
+   */
+  public static void writeWritableObject(
+    Writable object, DataOutput output)
+    throws IOException {
+    output.writeBoolean(object != null);
+    if (object != null) {
+      output.writeUTF(object.getClass().getName());
+      object.write(output);
+    }
+  }
+
+  /**
+   * Reads object from input stream
+   * @param input Input stream
+   * @param conf Configuration
+   * @param <T> Object type
+   * @return Object
+   * @throws IOException
+   */
+  public static <T extends Writable>
+  T readWritableObject(DataInput input,
+      ImmutableClassesGiraphConfiguration conf) throws IOException {
+    if (input.readBoolean()) {
+      String className = input.readUTF();
+      try {
+        T object =
+            (T) ReflectionUtils.newInstance(Class.forName(className), conf);
+        object.readFields(input);
+        return object;
+      } catch (ClassNotFoundException e) {
+        throw new IllegalStateException("readWritableObject: No class found " +
+            className);
+      }
+    } else {
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
index 9bfd7b5..45ca665 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java
@@ -18,14 +18,19 @@
 
 package org.apache.giraph.worker;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.giraph.aggregators.Aggregator;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
 import org.apache.giraph.comm.aggregators.AggregatedValueOutputStream;
 import org.apache.giraph.comm.aggregators.AggregatorUtils;
 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
-import org.apache.giraph.aggregators.Aggregator;
+import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.Factory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
@@ -33,10 +38,6 @@ import org.apache.log4j.Logger;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Handler for aggregators on worker. Provides the aggregated values and
  * performs aggregations from user vertex code (thread-safe). Also has
@@ -58,10 +59,13 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
   private static final Logger LOG =
       Logger.getLogger(WorkerAggregatorHandler.class);
   /** Map of values from previous superstep */
-  private Map<String, Writable> previousAggregatedValueMap =
+  private final Map<String, Writable> previousAggregatedValueMap =
       Maps.newHashMap();
+  /** Map of aggregator factories for current superstep */
+  private final Map<String, Factory<Aggregator<Writable>>>
+  currentAggregatorFactoryMap = Maps.newHashMap();
   /** Map of aggregators for current superstep */
-  private Map<String, Aggregator<Writable>> currentAggregatorMap =
+  private final Map<String, Aggregator<Writable>> currentAggregatorMap =
       Maps.newHashMap();
   /** Service worker */
   private final CentralizedServiceWorker<?, ?, ?> serviceWorker;
@@ -143,7 +147,8 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
     // Wait for all other aggregators and store them
     allAggregatorData.fillNextSuperstepMapsWhenReady(
         getOtherWorkerIdsSet(), previousAggregatedValueMap,
-        currentAggregatorMap);
+        currentAggregatorFactoryMap);
+    fillAndInitAggregatorsMap(currentAggregatorMap);
     allAggregatorData.reset();
     if (LOG.isDebugEnabled()) {
       LOG.debug("prepareSuperstep: Aggregators prepared");
@@ -151,6 +156,25 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
   }
 
   /**
+   * Fills aggregators map from currentAggregatorFactoryMap.
+   * All aggregators in this map will be set to initial value.
+   * @param aggregatorMap Map to fill.
+   */
+  private void fillAndInitAggregatorsMap(
+      Map<String, Aggregator<Writable>> aggregatorMap) {
+    for (Map.Entry<String, Factory<Aggregator<Writable>>> entry :
+        currentAggregatorFactoryMap.entrySet()) {
+      Aggregator<Writable> aggregator =
+          aggregatorMap.get(entry.getKey());
+      if (aggregator == null) {
+        aggregatorMap.put(entry.getKey(), entry.getValue().create());
+      } else {
+        aggregator.reset();
+      }
+    }
+  }
+
+  /**
    * Send aggregators to their owners and in the end to the master
    *
    * @param requestProcessor Request processor for aggregators
@@ -286,13 +310,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage {
     public ThreadLocalWorkerAggregatorUsage() {
       threadAggregatorMap = Maps.newHashMapWithExpectedSize(
           WorkerAggregatorHandler.this.currentAggregatorMap.size());
-      for (Map.Entry<String, Aggregator<Writable>> entry :
-          WorkerAggregatorHandler.this.currentAggregatorMap.entrySet()) {
-        threadAggregatorMap.put(entry.getKey(),
-            AggregatorUtils.newAggregatorInstance(
-                (Class<Aggregator<Writable>>) entry.getValue().getClass(),
-                conf));
-      }
+      fillAndInitAggregatorsMap(threadAggregatorMap);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java
new file mode 100644
index 0000000..2898647
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.aggregators;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.giraph.utils.ArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
+public class TestArrayAggregator {
+  @Test
+  public void testMaxAggregator() {
+    Aggregator<ArrayWritable<LongWritable>> max = new ArrayAggregatorFactory<>(2, LongMaxAggregator.class).create();
+
+    ArrayWritable<LongWritable> tmp = max.createInitialValue();
+
+    tmp.get()[0].set(2);
+    max.aggregate(tmp);
+
+    tmp.get()[0].set(3);
+    tmp.get()[1].set(1);
+    max.aggregate(tmp);
+
+    assertEquals(3L, max.getAggregatedValue().get()[0].get());
+    assertEquals(1L, max.getAggregatedValue().get()[1].get());
+
+    tmp.get()[0].set(-1);
+    tmp.get()[1].set(-1);
+    max.setAggregatedValue(tmp);
+
+    assertEquals(-1L, max.getAggregatedValue().get()[0].get());
+    assertEquals(-1L, max.getAggregatedValue().get()[1].get());
+  }
+}


Mime
View raw message