giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [36/51] [partial] GIRAPH-457: update module names (nitay)
Date Thu, 20 Dec 2012 04:25:33 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java b/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
deleted file mode 100644
index 27e1010..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/SendMessageCache.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.WorkerInfo;
-import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.giraph.utils.PairList;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Aggregates the messages to be send to workers so they can be sent
- * in bulk.  Not thread-safe.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public class SendMessageCache<I extends WritableComparable,
-    M extends Writable> {
-  /**
-   * How much bigger than the average per partition size to make initial per
-   * partition buffers.
-   * If this value is A, message request size is M,
-   * and a worker has P partitions, than its initial partition buffer size
-   * will be (M / P) * (1 + A).
-   */
-  public static final String ADDITIONAL_MSG_REQUEST_SIZE =
-      "giraph.additionalMsgRequestSize";
-  /**
-   * Default factor for how bigger should initial per partition buffers be
-   * of 20%.
-   */
-  public static final float ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT = 0.2f;
-
-  /** Internal cache */
-  private final ByteArrayVertexIdMessages<I, M>[] messageCache;
-  /** Size of messages (in bytes) for each worker */
-  private final int[] messageSizes;
-  /** How big to initially make output streams for each worker's partitions */
-  private final int[] initialBufferSizes;
-  /** List of partition ids belonging to a worker */
-  private final Map<WorkerInfo, List<Integer>> workerPartitions =
-      Maps.newHashMap();
-  /** Giraph configuration */
-  private final ImmutableClassesGiraphConfiguration conf;
-
-  /**
-   * Constructor
-   *
-   * @param conf Giraph configuration
-   * @param serviceWorker Service worker
-   */
-  public SendMessageCache(ImmutableClassesGiraphConfiguration conf,
-      CentralizedServiceWorker<?, ?, ?, ?> serviceWorker) {
-    this.conf = conf;
-
-    int maxPartition = 0;
-    for (PartitionOwner partitionOwner : serviceWorker.getPartitionOwners()) {
-      List<Integer> workerPartitionIds =
-          workerPartitions.get(partitionOwner.getWorkerInfo());
-      if (workerPartitionIds == null) {
-        workerPartitionIds = Lists.newArrayList();
-        workerPartitions.put(partitionOwner.getWorkerInfo(),
-            workerPartitionIds);
-      }
-      workerPartitionIds.add(partitionOwner.getPartitionId());
-      maxPartition = Math.max(partitionOwner.getPartitionId(), maxPartition);
-    }
-    messageCache = new ByteArrayVertexIdMessages[maxPartition + 1];
-
-    int maxWorker = 0;
-    for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
-      maxWorker = Math.max(maxWorker, workerInfo.getTaskId());
-    }
-    messageSizes = new int[maxWorker + 1];
-
-    float additionalRequestSize =
-        conf.getFloat(ADDITIONAL_MSG_REQUEST_SIZE,
-            ADDITIONAL_MSG_REQUEST_SIZE_DEFAULT);
-    int requestSize = conf.getInt(GiraphConstants.MAX_MSG_REQUEST_SIZE,
-        GiraphConstants.MAX_MSG_REQUEST_SIZE_DEFAULT);
-    int initialRequestSize = (int) (requestSize * (1 + additionalRequestSize));
-    initialBufferSizes = new int[maxWorker + 1];
-    for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
-      initialBufferSizes[workerInfo.getTaskId()] =
-          initialRequestSize / workerPartitions.get(workerInfo).size();
-    }
-  }
-
-  /**
-   * Add a message to the cache.
-   *
-   * @param workerInfo the remote worker destination
-   * @param partitionId the remote Partition this message belongs to
-   * @param destVertexId vertex id that is ultimate destination
-   * @param message Message to be send to remote
-   *                <b>host => partition => vertex</b>
-   * @return Size of messages for the worker.
-   */
-  public int addMessage(WorkerInfo workerInfo,
-    final int partitionId, I destVertexId, M message) {
-    // Get the message collection
-    ByteArrayVertexIdMessages<I, M> partitionMessages =
-        messageCache[partitionId];
-    int originalSize = 0;
-    if (partitionMessages == null) {
-      partitionMessages = new ByteArrayVertexIdMessages<I, M>();
-      partitionMessages.setConf(conf);
-      partitionMessages.initialize(initialBufferSizes[workerInfo.getTaskId()]);
-      messageCache[partitionId] = partitionMessages;
-    } else {
-      originalSize = partitionMessages.getSize();
-    }
-    partitionMessages.add(destVertexId, message);
-
-    // Update the size of cached, outgoing messages per worker
-    messageSizes[workerInfo.getTaskId()] +=
-        partitionMessages.getSize() - originalSize;
-    return messageSizes[workerInfo.getTaskId()];
-  }
-
-  /**
-   * Gets the messages for a worker and removes it from the cache.
-   *
-   * @param workerInfo the address of the worker who owns the data
-   *                   partitions that are receiving the messages
-   * @return List of pairs (partitionId, ByteArrayVertexIdMessages),
-   *         where all partition ids belong to workerInfo
-   */
-  public PairList<Integer, ByteArrayVertexIdMessages<I, M>>
-  removeWorkerMessages(WorkerInfo workerInfo) {
-    PairList<Integer, ByteArrayVertexIdMessages<I, M>> workerMessages =
-        new PairList<Integer, ByteArrayVertexIdMessages<I, M>>();
-    List<Integer> partitions = workerPartitions.get(workerInfo);
-    workerMessages.initialize(partitions.size());
-    for (Integer partitionId : partitions) {
-      if (messageCache[partitionId] != null) {
-        workerMessages.add(partitionId, messageCache[partitionId]);
-        messageCache[partitionId] = null;
-      }
-    }
-    messageSizes[workerInfo.getTaskId()] = 0;
-    return workerMessages;
-  }
-
-  /**
-   * Gets all the messages and removes them from the cache.
-   *
-   * @return All vertex messages for all partitions
-   */
-  public PairList<WorkerInfo, PairList<
-      Integer, ByteArrayVertexIdMessages<I, M>>> removeAllMessages() {
-    PairList<WorkerInfo, PairList<Integer,
-        ByteArrayVertexIdMessages<I, M>>>
-        allMessages = new PairList<WorkerInfo,
-        PairList<Integer, ByteArrayVertexIdMessages<I, M>>>();
-    allMessages.initialize(messageSizes.length);
-    for (WorkerInfo workerInfo : workerPartitions.keySet()) {
-      PairList<Integer, ByteArrayVertexIdMessages<I,
-                M>> workerMessages =
-          removeWorkerMessages(workerInfo);
-      if (!workerMessages.isEmpty()) {
-        allMessages.add(workerInfo, workerMessages);
-      }
-      messageSizes[workerInfo.getTaskId()] = 0;
-    }
-    return allMessages;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/SendMutationsCache.java b/giraph/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
deleted file mode 100644
index e52c101..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/SendMutationsCache.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.comm;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.VertexMutations;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Aggregates the mutations to be sent to partitions so they can be sent in
- * bulk.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public class SendMutationsCache<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
-  /** Internal cache */
-  private Map<Integer, Map<I, VertexMutations<I, V, E, M>>> mutationCache =
-      new HashMap<Integer, Map<I, VertexMutations<I, V, E, M>>>();
-  /** Number of mutations in each partition */
-  private final Map<Integer, Integer> mutationCountMap =
-      new HashMap<Integer, Integer>();
-
-  /**
-   * Get the mutations for a partition and destination vertex (creating if
-   * it doesn't exist).
-   *
-   * @param partitionId Partition id
-   * @param destVertexId Destination vertex id
-   * @return Mutations for the vertex
-   */
-  private VertexMutations<I, V, E, M> getVertexMutations(
-      Integer partitionId, I destVertexId) {
-    Map<I, VertexMutations<I, V, E, M>> idMutations =
-        mutationCache.get(partitionId);
-    if (idMutations == null) {
-      idMutations = new HashMap<I, VertexMutations<I, V, E, M>>();
-      mutationCache.put(partitionId, idMutations);
-    }
-    VertexMutations<I, V, E, M> mutations = idMutations.get(destVertexId);
-    if (mutations == null) {
-      mutations = new VertexMutations<I, V, E, M>();
-      idMutations.put(destVertexId, mutations);
-    }
-    return mutations;
-  }
-
-  /**
-   * Increment the number of mutations in a partition.
-   *
-   * @param partitionId Partition id
-   * @return Number of mutations in a partition after the increment
-   */
-  private int incrementPartitionMutationCount(int partitionId) {
-    Integer currentPartitionMutationCount = mutationCountMap.get(partitionId);
-    if (currentPartitionMutationCount == null) {
-      currentPartitionMutationCount = 0;
-    }
-    Integer updatedPartitionMutationCount =
-        currentPartitionMutationCount + 1;
-    mutationCountMap.put(partitionId, updatedPartitionMutationCount);
-    return updatedPartitionMutationCount;
-  }
-
-  /**
-   * Add an add edge mutation to the cache.
-   *
-   * @param partitionId Partition id
-   * @param destVertexId Destination vertex id
-   * @param edge Edge to be added
-   * @return Number of mutations in the partition.
-   */
-  public int addEdgeMutation(
-      Integer partitionId, I destVertexId, Edge<I, E> edge) {
-    // Get the mutations for this partition
-    VertexMutations<I, V, E, M> mutations =
-        getVertexMutations(partitionId, destVertexId);
-
-    // Add the edge
-    mutations.addEdge(edge);
-
-    // Update the number of mutations per partition
-    return incrementPartitionMutationCount(partitionId);
-  }
-
-  /**
-   * Add a remove edge mutation to the cache.
-   *
-   * @param partitionId Partition id
-   * @param vertexIndex Destination vertex id
-   * @param destinationVertexIndex Edge vertex index to be removed
-   * @return Number of mutations in the partition.
-   */
-  public int removeEdgeMutation(
-      Integer partitionId, I vertexIndex, I destinationVertexIndex) {
-    // Get the mutations for this partition
-    VertexMutations<I, V, E, M> mutations =
-        getVertexMutations(partitionId, vertexIndex);
-
-    // Remove the edge
-    mutations.removeEdge(destinationVertexIndex);
-
-    // Update the number of mutations per partition
-    return incrementPartitionMutationCount(partitionId);
-  }
-
-  /**
-   * Add a add vertex mutation to the cache.
-   *
-   * @param partitionId Partition id
-   * @param vertex Vertex to be added
-   * @return Number of mutations in the partition.
-   */
-  public int addVertexMutation(
-      Integer partitionId, Vertex<I, V, E, M> vertex) {
-    // Get the mutations for this partition
-    VertexMutations<I, V, E, M> mutations =
-        getVertexMutations(partitionId, vertex.getId());
-
-    // Add the vertex
-    mutations.addVertex(vertex);
-
-    // Update the number of mutations per partition
-    return incrementPartitionMutationCount(partitionId);
-  }
-
-  /**
-   * Add a remove vertex mutation to the cache.
-   *
-   * @param partitionId Partition id
-   * @param destVertexId Vertex index to be removed
-   * @return Number of mutations in the partition.
-   */
-  public int removeVertexMutation(
-      Integer partitionId, I destVertexId) {
-    // Get the mutations for this partition
-    VertexMutations<I, V, E, M> mutations =
-        getVertexMutations(partitionId, destVertexId);
-
-    // Remove the vertex
-    mutations.removeVertex();
-
-    // Update the number of mutations per partition
-    return incrementPartitionMutationCount(partitionId);
-  }
-
-  /**
-   * Gets the mutations for a partition and removes it from the cache.
-   *
-   * @param partitionId Partition id
-   * @return Removed partition mutations
-   */
-  public Map<I, VertexMutations<I, V, E, M>> removePartitionMutations(
-      int partitionId) {
-    Map<I, VertexMutations<I, V, E, M>> idMutations =
-        mutationCache.remove(partitionId);
-    mutationCountMap.put(partitionId, 0);
-    return idMutations;
-  }
-
-  /**
-   * Gets all the mutations and removes them from the cache.
-   *
-   * @return All vertex mutations for all partitions
-   */
-  public Map<Integer, Map<I, VertexMutations<I, V, E, M>>>
-  removeAllPartitionMutations() {
-    Map<Integer, Map<I, VertexMutations<I, V, E, M>>> allMutations =
-        mutationCache;
-    mutationCache =
-        new HashMap<Integer, Map<I, VertexMutations<I, V, E, M>>>();
-    mutationCountMap.clear();
-    return allMutations;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java b/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
deleted file mode 100644
index a199d0c..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/SendPartitionCache.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.comm;
-
-import com.google.common.collect.Maps;
-import java.util.Map;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.GiraphTransferRegulator;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.partition.Partition;
-import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
-
-/**
- * Caches partition vertices prior to sending.  Aggregating these requests
- * will make larger, more efficient requests.  Not thread-safe.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public class SendPartitionCache<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(SendPartitionCache.class);
-  /** Input split vertex cache (only used when loading from input split) */
-  private final Map<PartitionOwner, Partition<I, V, E, M>>
-  ownerPartitionMap = Maps.newHashMap();
-  /** Number of messages in each partition */
-  private final Map<PartitionOwner, Integer> messageCountMap =
-      Maps.newHashMap();
-  /** Context */
-  private final Mapper<?, ?, ?, ?>.Context context;
-  /** Configuration */
-  private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
-  /**
-   *  Regulates the size of outgoing Collections of vertices read
-   * by the local worker during INPUT_SUPERSTEP that are to be
-   * transfered from <code>inputSplitCache</code> to the owner
-   * of their initial, master-assigned Partition.*
-   */
-  private final GiraphTransferRegulator transferRegulator;
-
-  /**
-   * Constructor.
-   *
-   * @param context Context
-   * @param configuration Configuration
-   */
-  public SendPartitionCache(
-      Mapper<?, ?, ?, ?>.Context context,
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
-    this.context = context;
-    this.configuration = configuration;
-    transferRegulator =
-        new GiraphTransferRegulator(configuration);
-    if (LOG.isInfoEnabled()) {
-      LOG.info("SendPartitionCache: maxVerticesPerTransfer = " +
-          transferRegulator.getMaxVerticesPerTransfer());
-      LOG.info("SendPartitionCache: maxEdgesPerTransfer = " +
-          transferRegulator.getMaxEdgesPerTransfer());
-    }
-  }
-
-  /**
-   * Add a vertex to the cache, returning the partition if full
-   *
-   * @param partitionOwner Partition owner of the vertex
-   * @param vertex Vertex to add
-   * @return A partition to send or null, if requirements are not met
-   */
-  public Partition<I, V, E, M> addVertex(PartitionOwner partitionOwner,
-                                         Vertex<I, V, E, M> vertex) {
-    Partition<I, V, E, M> partition =
-        ownerPartitionMap.get(partitionOwner);
-    if (partition == null) {
-      partition = configuration.createPartition(
-          partitionOwner.getPartitionId(),
-          context);
-      ownerPartitionMap.put(partitionOwner, partition);
-    }
-    transferRegulator.incrementCounters(partitionOwner, vertex);
-
-    Vertex<I, V, E, M> oldVertex =
-        partition.putVertex(vertex);
-    if (oldVertex != null) {
-      LOG.warn("addVertex: Replacing vertex " + oldVertex +
-          " with " + vertex);
-    }
-
-    // Requirements met to transfer?
-    if (transferRegulator.transferThisPartition(partitionOwner)) {
-      return ownerPartitionMap.remove(partitionOwner);
-    }
-
-    return null;
-  }
-
-  /**
-   * Get the owner partition map (for flushing)
-   *
-   * @return Owner partition map
-   */
-  public Map<PartitionOwner, Partition<I, V, E, M>> getOwnerPartitionMap() {
-    return ownerPartitionMap;
-  }
-
-  /**
-   * Clear the cache.
-   */
-  public void clear() {
-    ownerPartitionMap.clear();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph/src/main/java/org/apache/giraph/comm/ServerData.java
deleted file mode 100644
index 7e89a41..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/ServerData.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm;
-
-import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
-import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
-import org.apache.giraph.comm.messages.MessageStoreByPartition;
-import org.apache.giraph.comm.messages.MessageStoreFactory;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.VertexMutations;
-import org.apache.giraph.graph.partition.DiskBackedPartitionStore;
-import org.apache.giraph.graph.partition.PartitionStore;
-import org.apache.giraph.graph.partition.SimplePartitionStore;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Anything that the server stores
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public class ServerData<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
-  /** Partition store for this worker. */
-  private volatile PartitionStore<I, V, E, M> partitionStore;
-  /** Message store factory */
-  private final
-  MessageStoreFactory<I, M, MessageStoreByPartition<I, M>> messageStoreFactory;
-  /**
-   * Message store for incoming messages (messages which will be consumed
-   * in the next super step)
-   */
-  private volatile MessageStoreByPartition<I, M> incomingMessageStore;
-  /**
-   * Message store for current messages (messages which we received in
-   * previous super step and which will be consumed in current super step)
-   */
-  private volatile MessageStoreByPartition<I, M> currentMessageStore;
-  /**
-   * Map of partition ids to incoming vertex mutations from other workers.
-   * (Synchronized access to values)
-   */
-  private final ConcurrentHashMap<I, VertexMutations<I, V, E, M>>
-  vertexMutations = new ConcurrentHashMap<I, VertexMutations<I, V, E, M>>();
-  /**
-   * Holds aggregtors which current worker owns from current superstep
-   */
-  private final OwnerAggregatorServerData ownerAggregatorData;
-  /**
-   * Holds old aggregators from previous superstep
-   */
-  private final AllAggregatorServerData allAggregatorData;
-
-  /**
-   * Constructor.
-   *
-   * @param configuration Configuration
-   * @param messageStoreFactory Factory for message stores
-   * @param context Mapper context
-   */
-  public ServerData(
-      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
-      MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
-          messageStoreFactory,
-      Mapper<?, ?, ?, ?>.Context context) {
-
-    this.messageStoreFactory = messageStoreFactory;
-    currentMessageStore = messageStoreFactory.newStore();
-    incomingMessageStore = messageStoreFactory.newStore();
-    if (configuration.getBoolean(GiraphConstants.USE_OUT_OF_CORE_GRAPH,
-        GiraphConstants.USE_OUT_OF_CORE_GRAPH_DEFAULT)) {
-      partitionStore =
-          new DiskBackedPartitionStore<I, V, E, M>(configuration, context);
-    } else {
-      partitionStore =
-          new SimplePartitionStore<I, V, E, M>(configuration, context);
-    }
-    ownerAggregatorData = new OwnerAggregatorServerData(context);
-    allAggregatorData = new AllAggregatorServerData(context);
-  }
-
-  /**
-   * Return the partition store for this worker.
-   *
-   * @return The partition store
-   */
-  public PartitionStore<I, V, E, M> getPartitionStore() {
-    return partitionStore;
-  }
-
-  /**
-   * Get message store for incoming messages (messages which will be consumed
-   * in the next super step)
-   *
-   * @return Incoming message store
-   */
-  public MessageStoreByPartition<I, M> getIncomingMessageStore() {
-    return incomingMessageStore;
-  }
-
-  /**
-   * Get message store for current messages (messages which we received in
-   * previous super step and which will be consumed in current super step)
-   *
-   * @return Current message store
-   */
-  public MessageStoreByPartition<I, M> getCurrentMessageStore() {
-    return currentMessageStore;
-  }
-
-  /** Prepare for next super step */
-  public void prepareSuperstep() {
-    if (currentMessageStore != null) {
-      try {
-        currentMessageStore.clearAll();
-      } catch (IOException e) {
-        throw new IllegalStateException(
-            "Failed to clear previous message store");
-      }
-    }
-    currentMessageStore = incomingMessageStore;
-    incomingMessageStore = messageStoreFactory.newStore();
-  }
-
-  /**
-   * Get the vertex mutations (synchronize on the values)
-   *
-   * @return Vertex mutations
-   */
-  public ConcurrentHashMap<I, VertexMutations<I, V, E, M>>
-  getVertexMutations() {
-    return vertexMutations;
-  }
-
-  /**
-   * Get holder for aggregators which current worker owns
-   *
-   * @return Holder for aggregators which current worker owns
-   */
-  public OwnerAggregatorServerData getOwnerAggregatorData() {
-    return ownerAggregatorData;
-  }
-
-  /**
-   * Get holder for aggregators from previous superstep
-   *
-   * @return Holder for aggregators from previous superstep
-   */
-  public AllAggregatorServerData getAllAggregatorData() {
-    return allAggregatorData;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java b/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
deleted file mode 100644
index 6e2dfbd..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/WorkerClient.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm;
-
-import org.apache.giraph.comm.requests.WritableRequest;
-
-import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.IOException;
-
-/**
- * Public interface for workers to establish connections and send aggregated
- * requests.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public interface WorkerClient<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
-
-  /**
-   *  Setup the client.
-   */
-/*if[HADOOP_NON_SECURE]
-  void setup();
-else[HADOOP_NON_SECURE]*/
-  /**
-   * Setup the client.
-   *
-   * @param authenticate whether to SASL authenticate with server or not:
-   * set by giraph.authenticate configuration option.
-   */
-  void setup(boolean authenticate);
-/*end[HADOOP_NON_SECURE]*/
-
-  /**
-   * Lookup PartitionOwner for a vertex.
-   *
-   * @param vertexId id to look up.
-   * @return PartitionOwner holding the vertex.
-   */
-  PartitionOwner getVertexPartitionOwner(I vertexId);
-
-  /**
-   * Make sure that all the connections to workers and master have been
-   * established.
-   */
-  void openConnections();
-
-  /**
-   * Send a request to a remote server (should be already connected)
-   *
-   * @param destTaskId Destination worker id
-   * @param request Request to send
-   */
-  void sendWritableRequest(Integer destTaskId, WritableRequest request);
-
-  /**
-   * Wait until all the outstanding requests are completed.
-   */
-  void waitAllRequests();
-
-  /**
-   * Closes all connections.
-   *
-   * @throws IOException
-   */
-  void closeConnections() throws IOException;
-
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
-  /**
-   * Authenticates, as client, with another BSP worker, as server.
-   *
-   * @throws IOException
-   */
-  void authenticate() throws IOException;
-/*end[HADOOP_NON_SECURE]*/
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java b/giraph/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
deleted file mode 100644
index 562962e..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/WorkerClientRequestProcessor.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.comm;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.graph.WorkerInfo;
-import org.apache.giraph.graph.partition.Partition;
-import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.IOException;
-
-/**
- * Aggregates IPC requests and sends them off
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public interface WorkerClientRequestProcessor<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable> {
-  /**
-   * Sends a message to destination vertex.
-   *
-   * @param destVertexId Destination vertex id.
-   * @param message Message to send.
-   * @return true if any network I/O occurred.
-   */
-  boolean sendMessageRequest(I destVertexId, M message);
-
-  /**
-   * Sends a vertex to the appropriate partition owner
-   *
-   * @param partitionOwner Owner of the vertex
-   * @param vertex Vertex to send
-   */
-  void sendVertexRequest(PartitionOwner partitionOwner,
-                         Vertex<I, V, E, M> vertex);
-
-  /**
-   * Send a partition request (no batching).
-   *
-   * @param workerInfo Worker to send the partition to
-   * @param partition Partition to send
-   */
-  void sendPartitionRequest(WorkerInfo workerInfo,
-                            Partition<I, V, E, M> partition);
-
-  /**
-   * Sends a request to the appropriate vertex range owner to add an edge
-   *
-   * @param vertexIndex Index of the vertex to get the request
-   * @param edge Edge to be added
-   * @throws java.io.IOException
-   */
-  void addEdgeRequest(I vertexIndex, Edge<I, E> edge) throws IOException;
-
-  /**
-   * Sends a request to the appropriate vertex range owner to remove all edges
-   * pointing to a given vertex.
-   *
-   * @param vertexIndex Index of the vertex to get the request
-   * @param destinationVertexIndex Index of the edge to be removed
-   * @throws IOException
-   */
-  void removeEdgesRequest(I vertexIndex, I destinationVertexIndex)
-    throws IOException;
-
-  /**
-   * Sends a request to the appropriate vertex range owner to add a vertex
-   *
-   * @param vertex Vertex to be added
-   * @throws IOException
-   */
-  void addVertexRequest(Vertex<I, V, E, M> vertex) throws IOException;
-
-  /**
-   * Sends a request to the appropriate vertex range owner to remove a vertex
-   *
-   * @param vertexIndex Index of the vertex to be removed
-   * @throws IOException
-   */
-  void removeVertexRequest(I vertexIndex) throws IOException;
-
-  /**
-   * Flush all outgoing messages.  This ensures that all the messages have been
-   * sent, but not guaranteed to have been delivered yet.
-   *
-   * @throws IOException
-   */
-  void flush() throws IOException;
-
-  /**
-   * Get the messages sent during this superstep and clear them.
-   *
-   * @return Number of messages sent before the reset.
-   */
-  long resetMessageCount();
-
-  /**
-   * Lookup PartitionOwner for a vertex.
-   *
-   * @param vertexId id to look up.
-   * @return PartitionOwner holding the vertex.
-   */
-  PartitionOwner getVertexPartitionOwner(I vertexId);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/WorkerClientServer.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/WorkerClientServer.java b/giraph/src/main/java/org/apache/giraph/comm/WorkerClientServer.java
deleted file mode 100644
index a55b607..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/WorkerClientServer.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Interface for both the client and the server
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public interface WorkerClientServer<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends WorkerClient<I, V, E, M>, WorkerServer<I, V, E, M> {
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java b/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java
deleted file mode 100644
index e60db55..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/WorkerServer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm;
-
-import org.apache.giraph.graph.GraphState;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.Closeable;
-import java.net.InetSocketAddress;
-
-/**
- * Interface for message communication server.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public interface WorkerServer<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends Closeable {
-  /**
-   * Get server address
-   *
-   * @return Address used by this server
-   */
-  InetSocketAddress getMyAddress();
-
-  /**
-   * Prepare incoming messages for computation, and resolve mutation requests.
-   *
-   * @param graphState Current graph state
-   */
-  void prepareSuperstep(GraphState<I, V, E, M> graphState);
-
-  /**
-   * Only resolve mutations requests (used for edge-oriented input).
-   *
-   * @param graphState Current graph state
-   */
-  void resolveMutations(GraphState<I, V, E, M> graphState);
-
-  /**
-   * Get server data
-   *
-   * @return Server data
-   */
-  ServerData<I, V, E, M> getServerData();
-
-  /**
-   * Shuts down.
-   */
-  void close();
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java b/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java
deleted file mode 100644
index 0010dba..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import org.apache.hadoop.io.Writable;
-
-import java.io.IOException;
-
-/**
- * Implementation of {@link CountingOutputStream} which allows writing of
- * aggregator values in the form of pair (name, value)
- */
-public class AggregatedValueOutputStream extends CountingOutputStream {
-  /**
-   * Write aggregator to the stream and increment internal counter
-   *
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatedValue Value of aggregator
-   * @return Number of bytes occupied by the stream
-   * @throws IOException
-   */
-  public int addAggregator(String aggregatorName,
-      Writable aggregatedValue) throws IOException {
-    incrementCounter();
-    dataOutput.writeUTF(aggregatorName);
-    aggregatedValue.write(dataOutput);
-    return getSize();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java b/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
deleted file mode 100644
index 19d810e..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import org.apache.giraph.graph.Aggregator;
-import org.apache.hadoop.io.Writable;
-
-import java.io.IOException;
-
-/**
- * Implementation of {@link CountingOutputStream} which allows writing of
- * aggregators in the form of triple (name, classname, value)
- */
-public class AggregatorOutputStream extends CountingOutputStream {
-  /**
-   * Write aggregator to the stream and increment internal counter
-   *
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatorClass Class of aggregator
-   * @param aggregatedValue Value of aggregator
-   * @return Number of bytes occupied by the stream
-   * @throws IOException
-   */
-  public int addAggregator(String aggregatorName,
-      Class<? extends Aggregator> aggregatorClass,
-      Writable aggregatedValue) throws IOException {
-    incrementCounter();
-    dataOutput.writeUTF(aggregatorName);
-    dataOutput.writeUTF(aggregatorClass.getName());
-    aggregatedValue.write(dataOutput);
-    return getSize();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java b/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
deleted file mode 100644
index ff93804..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Aggregator;
-import org.apache.giraph.graph.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-
-import java.util.List;
-
-/**
- * Class for aggregator constants and utility methods
- */
-public class AggregatorUtils {
-  /**
-   * Special aggregator name which will be used to send the total number of
-   * aggregators requests which should arrive
-   */
-  public static final String SPECIAL_COUNT_AGGREGATOR =
-      "__aggregatorRequestCount";
-  /** How big a single aggregator request can be (in bytes) */
-  public static final String MAX_BYTES_PER_AGGREGATOR_REQUEST =
-      "giraph.maxBytesPerAggregatorRequest";
-  /** Default max size of single aggregator request (1MB) */
-  public static final int MAX_BYTES_PER_AGGREGATOR_REQUEST_DEFAULT =
-      1024 * 1024;
-  /**
-   * Whether or not to have a copy of aggregators for each compute thread.
-   * Unless aggregators are very large and it would hurt the application to
-   * have that many copies of them, user should use thread-local aggregators
-   * to prevent synchronization when aggregate() is called (and get better
-   * performance because of it).
-   */
-  public static final String USE_THREAD_LOCAL_AGGREGATORS =
-      "giraph.useThreadLocalAggregators";
-  /** Default is not to have a copy of aggregators for each thread */
-  public static final boolean USE_THREAD_LOCAL_AGGREGATORS_DEFAULT = false;
-
-  /** Do not instantiate */
-  private AggregatorUtils() { }
-
-  /**
-   * Get aggregator class from class name, catch all exceptions.
-   *
-   * @param aggregatorClassName Class nam of aggregator class
-   * @return Aggregator class
-   */
-  public static Class<Aggregator<Writable>> getAggregatorClass(String
-      aggregatorClassName) {
-    try {
-      return (Class<Aggregator<Writable>>) Class.forName(aggregatorClassName);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalStateException("getAggregatorClass: " +
-          "ClassNotFoundException for aggregator class " + aggregatorClassName,
-          e);
-    }
-  }
-
-  /**
-   * Create new aggregator instance from aggregator class,
-   * catch all exceptions.
-   *
-   * @param aggregatorClass Class of aggregator
-   * @return New aggregator
-   */
-  public static Aggregator<Writable> newAggregatorInstance(
-      Class<Aggregator<Writable>> aggregatorClass) {
-    try {
-      return aggregatorClass.newInstance();
-    } catch (InstantiationException e) {
-      throw new IllegalStateException("createAggregator: " +
-          "InstantiationException for aggregator class " + aggregatorClass, e);
-    } catch (IllegalAccessException e) {
-      throw new IllegalStateException("createAggregator: " +
-          "IllegalAccessException for aggregator class " + aggregatorClass, e);
-    }
-  }
-
-  /**
-   * Get owner of aggregator with selected name from the list of workers
-   *
-   * @param aggregatorName Name of the aggregators
-   * @param workers List of workers
-   * @return Worker which owns the aggregator
-   */
-  public static WorkerInfo getOwner(String aggregatorName,
-      List<WorkerInfo> workers) {
-    int index = Math.abs(aggregatorName.hashCode() % workers.size());
-    return workers.get(index);
-  }
-
-  /**
-   * Check if we should use thread local aggregators.
-   *
-   * @param conf Giraph configuration
-   * @return True iff we should use thread local aggregators
-   */
-  public static boolean
-  useThreadLocalAggregators(ImmutableClassesGiraphConfiguration conf) {
-    return conf.getBoolean(USE_THREAD_LOCAL_AGGREGATORS,
-        USE_THREAD_LOCAL_AGGREGATORS_DEFAULT);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java b/giraph/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
deleted file mode 100644
index a7ecb39..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import org.apache.giraph.graph.Aggregator;
-import org.apache.giraph.utils.ExpectedBarrier;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Accepts aggregators and their values from previous superstep from master
- * and workers which own aggregators. Keeps data received from master so it
- * could be distributed later. Also counts the requests so we would know
- * when we are done receiving requests.
- *
- * Only restriction is that we need to call registerAggregatorClass before
- * calling createAggregatorInitialValue, other than that methods of this class
- * are thread-safe.
- */
-public class AllAggregatorServerData {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(AllAggregatorServerData.class);
-  /**
-   * Map from aggregator class to aggregator object which we need in order
-   * to create initial aggregated values
-   */
-  private final
-  ConcurrentMap<Class<Aggregator<Writable>>, Aggregator<Writable>>
-  aggregatorTypesMap = Maps.newConcurrentMap();
-  /** Map of aggregator classes */
-  private final ConcurrentMap<String, Class<Aggregator<Writable>>>
-  aggregatorClassMap = Maps.newConcurrentMap();
-  /** Map of values of aggregators from previous superstep */
-  private final ConcurrentMap<String, Writable>
-  aggregatedValuesMap = Maps.newConcurrentMap();
-  /**
-   * Counts the requests with final aggregators from master.
-   * It uses values from special aggregators
-   * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)
-   * to know how many requests it has to receive.
-   */
-  private final ExpectedBarrier masterBarrier;
-  /**
-   * Aggregator data which this worker received from master and which it is
-   * going to distribute before starting next superstep. Thread-safe.
-   */
-  private final List<byte[]> masterData =
-      Collections.synchronizedList(Lists.<byte[]>newArrayList());
-  /**
-   * Counts the requests with final aggregators from other workers.
-   * It uses values from special aggregators
-   * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)
-   * to know how many requests it has to receive.
-   */
-  private final ExpectedBarrier workersBarrier;
-  /** Progressable used to report progress */
-  private final Progressable progressable;
-
-  /**
-   * Constructor
-   *
-   * @param progressable Progressable used to report progress
-   */
-  public AllAggregatorServerData(Progressable progressable) {
-    this.progressable = progressable;
-    workersBarrier = new ExpectedBarrier(progressable);
-    masterBarrier = new ExpectedBarrier(progressable);
-  }
-
-  /**
-   * Register the class of the aggregator, received by master or worker.
-   *
-   * @param name            Aggregator name
-   * @param aggregatorClass Class of the aggregator
-   */
-  public void registerAggregatorClass(String name,
-      Class<Aggregator<Writable>> aggregatorClass) {
-    aggregatorClassMap.put(name, aggregatorClass);
-    if (!aggregatorTypesMap.containsKey(aggregatorClass)) {
-      aggregatorTypesMap.putIfAbsent(aggregatorClass,
-          AggregatorUtils.newAggregatorInstance(aggregatorClass));
-    }
-    progressable.progress();
-  }
-
-  /**
-   * Set the value of aggregator from previous superstep,
-   * received by master or worker.
-   *
-   * @param name Name of the aggregator
-   * @param value Value of the aggregator
-   */
-  public void setAggregatorValue(String name, Writable value) {
-    aggregatedValuesMap.put(name, value);
-    progressable.progress();
-  }
-
-  /**
-   * Create initial aggregated value for an aggregator. Used so requests
-   * would be able to deserialize data.
-   * registerAggregatorClass needs to be called first to ensure that we have
-   * the class of the aggregator.
-   *
-   * @param name Name of the aggregator
-   * @return Empty aggregated value for this aggregator
-   */
-  public Writable createAggregatorInitialValue(String name) {
-    Class<Aggregator<Writable>> aggregatorClass = aggregatorClassMap.get(name);
-    Aggregator<Writable> aggregator = aggregatorTypesMap.get(aggregatorClass);
-    synchronized (aggregator) {
-      return aggregator.createInitialValue();
-    }
-  }
-
-  /**
-   * Notify this object that an aggregator request from master has been
-   * received.
-   *
-   * @param aggregatorData Byte request with data received from master
-   */
-  public void receivedRequestFromMaster(byte[] aggregatorData) {
-    masterData.add(aggregatorData);
-    masterBarrier.releaseOnePermit();
-  }
-
-  /**
-   * Notify this object about the total number of requests which should
-   * arrive from master.
-   *
-   * @param requestCount Number of requests which should arrive
-   */
-  public void receivedRequestCountFromMaster(long requestCount) {
-    masterBarrier.requirePermits(requestCount);
-  }
-
-  /**
-   * Notify this object that an aggregator request from some worker has been
-   * received.
-   */
-  public void receivedRequestFromWorker() {
-    workersBarrier.releaseOnePermit();
-  }
-
-  /**
-   * Notify this object about the total number of requests which should
-   * arrive from one of the workers.
-   *
-   * @param requestCount Number of requests which should arrive
-   */
-  public void receivedRequestCountFromWorker(long requestCount) {
-    workersBarrier.requirePermits(requestCount);
-  }
-
-  /**
-   * This function will wait until all aggregator requests from master have
-   * arrived, and return that data afterwards.
-   *
-   * @return Iterable through data received from master
-   */
-  public Iterable<byte[]> getDataFromMasterWhenReady() {
-    masterBarrier.waitForRequiredPermits(1);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("getDataFromMasterWhenReady: " +
-          "Aggregator data for distribution ready");
-    }
-    return masterData;
-  }
-
-  /**
-   * This function will wait until all aggregator requests from workers have
-   * arrived, and fill the maps for next superstep when ready.
-   *
-   * @param numberOfWorkers Total number of workers in the job
-   * @param previousAggregatedValuesMap Map of values from previous
-   *                                    superstep to fill out
-   * @param currentAggregatorMap Map of aggregators for current superstep to
-   *                             fill out. All aggregators in this map will
-   *                             be set to initial value.
-   */
-  public void fillNextSuperstepMapsWhenReady(
-      int numberOfWorkers,
-      Map<String, Writable> previousAggregatedValuesMap,
-      Map<String, Aggregator<Writable>> currentAggregatorMap) {
-    workersBarrier.waitForRequiredPermits(numberOfWorkers - 1);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("fillNextSuperstepMapsWhenReady: Aggregators ready");
-    }
-    previousAggregatedValuesMap.clear();
-    previousAggregatedValuesMap.putAll(aggregatedValuesMap);
-    for (Map.Entry<String, Class<Aggregator<Writable>>> entry :
-        aggregatorClassMap.entrySet()) {
-      Aggregator<Writable> aggregator =
-          currentAggregatorMap.get(entry.getKey());
-      if (aggregator == null) {
-        currentAggregatorMap.put(entry.getKey(),
-            AggregatorUtils.newAggregatorInstance(entry.getValue()));
-      } else {
-        aggregator.reset();
-      }
-    }
-  }
-
-  /**
-   * Prepare for next superstep
-   */
-  public void reset() {
-    masterData.clear();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("reset: Ready for next superstep");
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingCache.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingCache.java b/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingCache.java
deleted file mode 100644
index 42afd62..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingCache.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import com.google.common.collect.Maps;
-
-import java.util.Map;
-
-/**
- * Cache which counts the number of flushes per task id (destination worker
- * id), so we know how many requests were sent to the worker
- */
-public abstract class CountingCache {
-  /** Counts the number of flushes for each worker */
-  private Map<Integer, Long> countMap = Maps.newHashMap();
-
-  /**
-   * Increase count of flushes for worker with desired task id. Subclasses
-   * should call this method whenever flush is called.
-   *
-   * @param taskId Task id of worker
-   */
-  protected void incrementCounter(Integer taskId) {
-    Long currentCount = countMap.get(taskId);
-    countMap.put(taskId,
-        (currentCount == null) ? 1 : (currentCount + 1));
-  }
-
-  /**
-   * Get number of flushes for worker with desired task id
-   *
-   * @param taskId Task id of worker
-   * @return Number of objects for the worker
-   */
-  protected long getCount(Integer taskId) {
-    Long count = countMap.get(taskId);
-    if (count == null) {
-      return 0;
-    } else {
-      return count.longValue();
-    }
-  }
-
-  /**
-   * Reset the counts
-   */
-  public void reset() {
-    countMap.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java b/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java
deleted file mode 100644
index e4f782e..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import java.io.IOException;
-import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
-import org.apache.giraph.utils.ExtendedDataOutput;
-
-/**
- * Wrapper for output stream which keeps the place in the beginning for the
- * count of objects which were written to it
- */
-public abstract class CountingOutputStream {
-  /** DataOutput to which subclasses will be writing data */
-  protected ExtendedDataOutput dataOutput;
-  /** Counter for objects which were written to the stream */
-  private int counter;
-
-  /**
-   * Default constructor
-   */
-  public CountingOutputStream() {
-    dataOutput = new ExtendedByteArrayDataOutput();
-    reset();
-  }
-
-  /**
-   * Subclasses should call this method when an object is written
-   */
-  protected void incrementCounter() {
-    counter++;
-  }
-
-  /**
-   * Get the number of bytes in the stream
-   *
-   * @return Number of bytes
-   */
-  protected int getSize() {
-    return dataOutput.getPos();
-  }
-
-  /**
-   * Returns all the data from the stream and clears it.
-   *
-   * @return Number of objects followed by the data written to the stream
-   */
-  public byte[] flush() {
-    dataOutput.writeInt(0, counter);
-    // Actual flush not required, this is a byte array
-    byte[] ret = dataOutput.toByteArray();
-    reset();
-    return ret;
-  }
-
-  /**
-   * Reset the stream
-   */
-  private void reset() {
-    dataOutput.reset();
-    // reserve the place for count to be written in the end
-    try {
-      dataOutput.writeInt(0);
-    } catch (IOException e) {
-      throw new IllegalStateException("reset: Got IOException", e);
-    }
-    counter = 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java b/giraph/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
deleted file mode 100644
index d586997..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import org.apache.giraph.graph.Aggregator;
-import org.apache.giraph.utils.ExpectedBarrier;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import java.util.AbstractMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Class for holding aggregators which current worker owns,
- * and aggregating partial aggregator values from workers.
- *
- * Protocol:
- * 1. Before the beginning of superstep, worker receives its aggregators
- * from master, and these aggregators will be registered to this class.
- * Multiple registrations can be called concurrently.
- * 2. During the superstep, whenever a worker finishes computation,
- * it will send partial aggregated values to worker owner. This class is used
- * to help deserialize the arriving aggregator values, and aggregate the values
- * at the destination owner worker; these can happen concurrently.
- * (we know step 1. is finished before anything from step 2. happens because
- * other workers can't start computation before they receive aggregators
- * which this worker owns)
- * 3. This class also tracks the number of partial aggregator requests which
- * worker received. In the end of superstep, getMyAggregatorValuesWhenReady
- * will be called to ensure everything was received and get the values which
- * need to be sent to master.
- * Because of this counting, in step 2. even if worker owns no aggregators,
- * it will still send a message without aggregator data.
- * 4. In the end we reset to prepare for the next superstep.
- */
-public class OwnerAggregatorServerData {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(OwnerAggregatorServerData.class);
-  /** Map of aggregators which current worker owns */
-  private final ConcurrentMap<String, Aggregator<Writable>>
-  myAggregatorMap = Maps.newConcurrentMap();
-  /**
-   * Counts the requests with partial aggregated values from other workers.
-   * It uses values from special aggregators
-   * (named AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)
-   * to know how many requests it has to receive.
-   */
-  private final ExpectedBarrier workersBarrier;
-  /** Progressable used to report progress */
-  private final Progressable progressable;
-
-  /**
-   * Constructor
-   *
-   * @param progressable Progressable used to report progress
-   */
-  public OwnerAggregatorServerData(Progressable progressable) {
-    this.progressable = progressable;
-    workersBarrier = new ExpectedBarrier(progressable);
-  }
-
-  /**
-   * Register an aggregator which current worker owns. Thread-safe.
-   *
-   * @param name Name of aggregator
-   * @param aggregatorClass Aggregator class
-   */
-  public void registerAggregator(String name,
-      Class<Aggregator<Writable>> aggregatorClass) {
-    if (LOG.isDebugEnabled() && myAggregatorMap.isEmpty()) {
-      LOG.debug("registerAggregator: The first registration after a reset()");
-    }
-    myAggregatorMap.putIfAbsent(name,
-        AggregatorUtils.newAggregatorInstance(aggregatorClass));
-    progressable.progress();
-  }
-
-  /**
-   * Aggregate partial value of one of current worker's aggregators.
-   *
-   * Thread-safe. Call only after aggregators have been registered.
-   *
-   * @param name Name of the aggregator
-   * @param value Value to aggregate to it
-   */
-  public void aggregate(String name, Writable value) {
-    Aggregator<Writable> aggregator = myAggregatorMap.get(name);
-    synchronized (aggregator) {
-      aggregator.aggregate(value);
-    }
-    progressable.progress();
-  }
-
-  /**
-   * Create initial aggregated value for an aggregator. Used so requests
-   * would be able to deserialize data.
-   *
-   * Thread-safe. Call only after aggregators have been registered.
-   *
-   * @param name Name of the aggregator
-   * @return Empty aggregated value for this aggregator
-   */
-  public Writable createAggregatorInitialValue(String name) {
-    Aggregator<Writable> aggregator = myAggregatorMap.get(name);
-    synchronized (aggregator) {
-      return aggregator.createInitialValue();
-    }
-  }
-
-  /**
-   * Notify this object that a partial aggregated values request from some
-   * worker have been received. Thread-safe.
-   */
-  public void receivedRequestFromWorker() {
-    workersBarrier.releaseOnePermit();
-  }
-
-  /**
-   * Notify this object about the total number of requests which should
-   * arrive from one of the workers. Thread-safe.
-   *
-   * @param requestCount Number of requests which should arrive
-   */
-  public void receivedRequestCountFromWorker(long requestCount) {
-    workersBarrier.requirePermits(requestCount);
-  }
-
-  /**
-   * This function will wait until all partial aggregated values from all
-   * workers are ready and aggregated, and return final aggregated values
-   * afterwards.
-   *
-   * @param numberOfWorkers Total number of workers in the job
-   * @return Iterable through final aggregated values which this worker owns
-   */
-  public Iterable<Map.Entry<String, Writable>>
-  getMyAggregatorValuesWhenReady(int numberOfWorkers) {
-    workersBarrier.waitForRequiredPermits(numberOfWorkers - 1);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("getMyAggregatorValuesWhenReady: Values ready");
-    }
-    return Iterables.transform(myAggregatorMap.entrySet(),
-        new Function<Map.Entry<String, Aggregator<Writable>>,
-            Map.Entry<String, Writable>>() {
-          @Override
-          public Map.Entry<String, Writable> apply(
-              Map.Entry<String, Aggregator<Writable>> aggregator) {
-            return new AbstractMap.SimpleEntry<String, Writable>(
-                aggregator.getKey(),
-                aggregator.getValue().getAggregatedValue());
-          }
-        });
-  }
-
-  /**
-   * Prepare for next superstep
-   */
-  public void reset() {
-    myAggregatorMap.clear();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("reset: Ready for next superstep");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java b/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java
deleted file mode 100644
index 468ee5c..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Takes and serializes aggregated values and keeps them grouped by owner
- * partition id, to be sent in bulk.
- */
-public class SendAggregatedValueCache extends CountingCache {
-  /** Map from worker partition id to aggregator output stream */
-  private final Map<Integer, AggregatedValueOutputStream> aggregatorMap =
-      Maps.newHashMap();
-
-  /**
-   * Add aggregated value to the cache
-   *
-   * @param taskId Task id of worker which owns the aggregator
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatedValue Value of the aggregator
-   * @return Number of bytes in serialized data for this worker
-   * @throws IOException
-   */
-  public int addAggregator(Integer taskId, String aggregatorName,
-      Writable aggregatedValue) throws IOException {
-    AggregatedValueOutputStream out = aggregatorMap.get(taskId);
-    if (out == null) {
-      out = new AggregatedValueOutputStream();
-      aggregatorMap.put(taskId, out);
-    }
-    return out.addAggregator(aggregatorName, aggregatedValue);
-  }
-
-  /**
-   * Remove and get aggregators for certain worker
-   *
-   * @param taskId Partition id of worker owner
-   * @return Serialized aggregator data for this worker
-   */
-  public byte[] removeAggregators(Integer taskId) {
-    incrementCounter(taskId);
-    AggregatedValueOutputStream out = aggregatorMap.remove(taskId);
-    if (out == null) {
-      return new byte[4];
-    } else {
-      return out.flush();
-    }
-  }
-
-  /**
-   * Creates fake aggregator which will hold the total number of aggregator
-   * requests for worker with selected task id. This should be called after all
-   * aggregators for the worker have been added to the cache.
-   *
-   * @param taskId Destination worker's task id
-   * @throws IOException
-   */
-  public void addCountAggregator(Integer taskId) throws IOException {
-    // current number of requests, plus one for the last flush
-    long totalCount = getCount(taskId) + 1;
-    addAggregator(taskId, AggregatorUtils.SPECIAL_COUNT_AGGREGATOR,
-        new LongWritable(totalCount));
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java b/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
deleted file mode 100644
index 701e38d..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import org.apache.giraph.graph.Aggregator;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Takes and serializes aggregators and keeps them grouped by owner
- * partition id, to be sent in bulk.
- */
-public class SendAggregatorCache extends CountingCache {
-  /** Map from worker partition id to aggregator output stream */
-  private final Map<Integer, AggregatorOutputStream> aggregatorMap =
-      Maps.newHashMap();
-
-  /**
-   * Add aggregator to the cache
-   *
-   * @param taskId Task id of worker which owns the aggregator
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatorClass Class of the aggregator
-   * @param aggregatedValue Value of the aggregator
-   * @return Number of bytes in serialized data for this worker
-   * @throws IOException
-   */
-  public int addAggregator(Integer taskId, String aggregatorName,
-      Class<? extends Aggregator> aggregatorClass,
-      Writable aggregatedValue) throws IOException {
-    AggregatorOutputStream out = aggregatorMap.get(taskId);
-    if (out == null) {
-      out = new AggregatorOutputStream();
-      aggregatorMap.put(taskId, out);
-    }
-    return out.addAggregator(aggregatorName, aggregatorClass,
-        aggregatedValue);
-  }
-
-  /**
-   * Remove and get aggregators for certain worker
-   *
-   * @param taskId Task id of worker owner
-   * @return Serialized aggregator data for this worker
-   */
-  public byte[] removeAggregators(Integer taskId) {
-    incrementCounter(taskId);
-    AggregatorOutputStream out = aggregatorMap.remove(taskId);
-    if (out == null) {
-      return new byte[4];
-    } else {
-      return out.flush();
-    }
-  }
-
-  /**
-   * Creates fake aggregator which will hold the total number of aggregator
-   * requests for worker with selected task id. This should be called after all
-   * aggregators for the worker have been added to the cache.
-   *
-   * @param taskId Destination worker's task id
-   * @throws IOException
-   */
-  public void addCountAggregator(Integer taskId) throws IOException {
-    // current number of requests, plus one for the last flush
-    long totalCount = getCount(taskId) + 1;
-    addAggregator(taskId, AggregatorUtils.SPECIAL_COUNT_AGGREGATOR,
-        Aggregator.class, new LongWritable(totalCount));
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java b/giraph/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java
deleted file mode 100644
index 360a39b..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.aggregators;
-
-import org.apache.hadoop.io.Writable;
-
-import java.io.IOException;
-
-/**
- * Aggregates worker aggregator requests and sends them off
- */
-public interface WorkerAggregatorRequestProcessor {
-  /**
-   * Sends worker aggregated value to the owner of aggregator
-   *
-   * @param aggregatorName Name of the aggregator
-   * @param aggregatedValue Value of the aggregator
-   * @throws java.io.IOException
-   * @return True if aggregated value will be sent, false if this worker is
-   * the owner of the aggregator
-   */
-  boolean sendAggregatedValue(String aggregatorName,
-      Writable aggregatedValue) throws IOException;
-
-  /**
-   * Flush aggregated values cache.
-   *
-   * @throws IOException
-   */
-  void flush() throws IOException;
-
-  /**
-   * Sends aggregated values to the master. This worker is the owner of these
-   * aggregators.
-   *
-   * @param aggregatorData Serialized aggregator data
-   * @throws IOException
-   */
-  void sendAggregatedValuesToMaster(byte[] aggregatorData) throws IOException;
-
-  /**
-   * Sends aggregators to all other workers
-   *
-   * @param aggregatorDataList Serialized aggregator data split into chunks
-   */
-  void distributeAggregators(
-      Iterable<byte[]> aggregatorDataList) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/aggregators/package-info.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/aggregators/package-info.java b/giraph/src/main/java/org/apache/giraph/comm/aggregators/package-info.java
deleted file mode 100644
index 3fd8496..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/aggregators/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package for classes which are used to handle aggregators.
- */
-package org.apache.giraph.comm.aggregators;

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java b/giraph/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
deleted file mode 100644
index dcb6223..0000000
--- a/giraph/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages;
-
-import java.io.IOException;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Most basic message store with just add, get and clear operations
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public interface BasicMessageStore<I extends WritableComparable,
-    M extends Writable> extends Writable {
-  /**
-   * Adds messages from one message store to another
-   *
-   * @param messageStore Add the messages from this message store to this
-   *                     object
-   * @throws java.io.IOException
-   */
-  void addMessages(MessageStore<I, M> messageStore) throws IOException;
-
-  /**
-   * Gets messages for a vertex.  The lifetime of every message is only
-   * guaranteed until the iterator's next() method is called.  Do not re-use
-   * the messages.
-   *
-   * @param vertexId Vertex id for which we want to get messages
-   * @return Iterable of messages for a vertex id
-   * @throws IOException
-   */
-  Iterable<M> getVertexMessages(I vertexId) throws IOException;
-
-  /**
-   * Clears messages for a vertex.
-   *
-   * @param vertexId Vertex id for which we want to clear messages
-   * @throws IOException
-   */
-  void clearVertexMessages(I vertexId) throws IOException;
-
-  /**
-   * Clears all resources used by this store.
-   *
-   * @throws IOException
-   */
-  void clearAll() throws IOException;
-}


Mime
View raw message