giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [49/51] [partial] GIRAPH-457: update module names (nitay)
Date Thu, 20 Dec 2012 04:25:33 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java b/giraph/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
deleted file mode 100644
index 531559c..0000000
--- a/giraph/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.graph;
-
-import com.google.common.collect.UnmodifiableIterator;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-import org.apache.mahout.math.function.LongFloatProcedure;
-import org.apache.mahout.math.list.DoubleArrayList;
-import org.apache.mahout.math.map.OpenLongFloatHashMap;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-
-/**
- * Optimized vertex implementation for
- * <LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- */
-public abstract class LongDoubleFloatDoubleVertex extends
-    MutableVertex<LongWritable, DoubleWritable, FloatWritable,
-        DoubleWritable> {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(LongDoubleFloatDoubleVertex.class);
-  /** Stores the edges */
-  private OpenLongFloatHashMap edgeMap =
-      new OpenLongFloatHashMap();
-
-  @Override
-  public void setEdges(Iterable<Edge<LongWritable, FloatWritable>> edges) {
-    if (edges != null) {
-      for (Edge<LongWritable, FloatWritable> edge : edges) {
-        edgeMap.put(edge.getTargetVertexId().get(), edge.getValue().get());
-      }
-    }
-  }
-
-  @Override
-  public boolean addEdge(Edge<LongWritable, FloatWritable> edge) {
-    if (edgeMap.put(edge.getTargetVertexId().get(),
-        edge.getValue().get())) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("addEdge: Vertex=" + getId() +
-            ": already added an edge value for dest vertex id " +
-            edge.getTargetVertexId());
-      }
-      return false;
-    } else {
-      return true;
-    }
-  }
-
-  @Override
-  public int removeEdges(LongWritable targetVertexId) {
-    long target = targetVertexId.get();
-    if (edgeMap.containsKey(target)) {
-      edgeMap.removeKey(target);
-      return 1;
-    } else {
-      return 0;
-    }
-  }
-
-  @Override
-  public Iterable<Edge<LongWritable, FloatWritable>> getEdges() {
-    final long[] targetVertices = edgeMap.keys().elements();
-    final int numEdges = edgeMap.size();
-
-    return new Iterable<Edge<LongWritable, FloatWritable>>() {
-      @Override
-      public Iterator<Edge<LongWritable, FloatWritable>> iterator() {
-        return new Iterator<Edge<LongWritable, FloatWritable>>() {
-          private int offset = 0;
-
-          @Override
-          public boolean hasNext() {
-            return offset < numEdges;
-          }
-
-          @Override
-          public Edge<LongWritable, FloatWritable> next() {
-            long targetVertex = targetVertices[offset++];
-            return new Edge<LongWritable, FloatWritable>(
-                new LongWritable(targetVertex),
-                new FloatWritable(targetVertex));
-          }
-
-          @Override
-          public void remove() {
-            throw new UnsupportedOperationException(
-                "Mutation disallowed for edge list via iterator");
-          }
-        };
-      }
-    };
-  }
-
-  @Override
-  public boolean hasEdge(LongWritable targetVertexId) {
-    return edgeMap.containsKey(targetVertexId.get());
-  }
-
-  @Override
-  public int getNumEdges() {
-    return edgeMap.size();
-  }
-
-  @Override
-  public final void readFields(DataInput in) throws IOException {
-    long id = in.readLong();
-    double value = in.readDouble();
-    initialize(new LongWritable(id), new DoubleWritable(value));
-    edgeMap.clear();
-    long edgeMapSize = in.readLong();
-    for (long i = 0; i < edgeMapSize; ++i) {
-      long targetVertexId = in.readLong();
-      float edgeValue = in.readFloat();
-      edgeMap.put(targetVertexId, edgeValue);
-    }
-    readHaltBoolean(in);
-  }
-
-  @Override
-  public final void write(final DataOutput out) throws IOException {
-    out.writeLong(getId().get());
-    out.writeDouble(getValue().get());
-    out.writeLong(edgeMap.size());
-    edgeMap.forEachPair(new LongFloatProcedure() {
-      @Override
-      public boolean apply(long destVertexId, float edgeValue) {
-        try {
-          out.writeLong(destVertexId);
-          out.writeFloat(edgeValue);
-        } catch (IOException e) {
-          throw new IllegalStateException(
-              "apply: IOException when not allowed", e);
-        }
-        return true;
-      }
-    });
-    out.writeBoolean(isHalted());
-  }
-
-  /**
-   * Helper iterable over the messages.
-   */
-  private static class UnmodifiableDoubleWritableIterable
-    implements Iterable<DoubleWritable> {
-    /** Backing store of messages */
-    private final DoubleArrayList elementList;
-
-    /**
-     * Constructor.
-     *
-     * @param elementList Backing store of element list.
-     */
-    public UnmodifiableDoubleWritableIterable(
-        DoubleArrayList elementList) {
-      this.elementList = elementList;
-    }
-
-    @Override
-    public Iterator<DoubleWritable> iterator() {
-      return new UnmodifiableDoubleWritableIterator(
-          elementList);
-    }
-  }
-
-  /**
-   * Iterator over the messages.
-   */
-  private static class UnmodifiableDoubleWritableIterator
-      extends UnmodifiableIterator<DoubleWritable> {
-    /** Double backing list */
-    private final DoubleArrayList elementList;
-    /** Offset into the backing list */
-    private int offset = 0;
-
-    /**
-     * Constructor.
-     *
-     * @param elementList Backing store of element list.
-     */
-    UnmodifiableDoubleWritableIterator(DoubleArrayList elementList) {
-      this.elementList = elementList;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return offset < elementList.size();
-    }
-
-    @Override
-    public DoubleWritable next() {
-      return new DoubleWritable(elementList.get(offset++));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java b/giraph/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java
deleted file mode 100644
index b35fe56..0000000
--- a/giraph/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import com.google.common.collect.Iterables;
-import org.apache.giraph.utils.UnmodifiableLongNullEdgeArrayIterable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collections;
-
-/**
- * Compact vertex representation with primitive arrays and null edges.
- */
-public abstract class LongDoubleNullDoubleVertex extends
-    Vertex<LongWritable, DoubleWritable, NullWritable, DoubleWritable> {
-  /** long represented vertex id */
-  private long id;
-  /** double represented vertex value */
-  private double value;
-  /** long array of neighbor vertex ids */
-  private long[] neighbors;
-
-  @Override
-  public void initialize(LongWritable vertexId, DoubleWritable vertexValue) {
-    id = vertexId.get();
-    value = vertexValue.get();
-    setEdges(Collections.<Edge<LongWritable, NullWritable>>emptyList());
-  }
-
-  @Override
-  public void initialize(LongWritable vertexId, DoubleWritable vertexValue,
-                         Iterable<Edge<LongWritable, NullWritable>> edges) {
-    id = vertexId.get();
-    value = vertexValue.get();
-    setEdges(edges);
-  }
-
-  @Override
-  public void setEdges(Iterable<Edge<LongWritable, NullWritable>> edges) {
-    neighbors = new long[(edges != null) ? Iterables.size(edges) : 0];
-    int n = 0;
-    if (edges != null) {
-      for (Edge<LongWritable, NullWritable> edge : edges) {
-        neighbors[n++] = edge.getTargetVertexId().get();
-      }
-    }
-  }
-
-  @Override
-  public LongWritable getId() {
-    return new LongWritable(id);
-  }
-
-  @Override
-  public DoubleWritable getValue() {
-    return new DoubleWritable(value);
-  }
-
-  @Override
-  public void setValue(DoubleWritable vertexValue) {
-    value = vertexValue.get();
-  }
-
-  @Override
-  public Iterable<Edge<LongWritable, NullWritable>> getEdges() {
-    return new UnmodifiableLongNullEdgeArrayIterable(neighbors);
-  }
-
-  @Override
-  public NullWritable getEdgeValue(LongWritable targetVertexId) {
-    return NullWritable.get();
-  }
-
-  @Override
-  public boolean hasEdge(LongWritable targetVertexId) {
-    for (long neighbor : neighbors) {
-      if (neighbor == targetVertexId.get()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public int getNumEdges() {
-    return neighbors.length;
-  }
-
-  @Override
-  public void sendMessageToAllEdges(final DoubleWritable message) {
-    for (long neighbor : neighbors) {
-      sendMessage(new LongWritable(neighbor), message);
-    }
-  }
-
-  @Override
-  public void write(final DataOutput out) throws IOException {
-    out.writeLong(id);
-    out.writeDouble(value);
-    out.writeInt(neighbors.length);
-    for (int n = 0; n < neighbors.length; n++) {
-      out.writeLong(neighbors[n]);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    id = in.readLong();
-    value = in.readDouble();
-    int numEdges = in.readInt();
-    neighbors = new long[numEdges];
-    for (int n = 0; n < numEdges; n++) {
-      neighbors[n] = in.readLong();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java b/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
deleted file mode 100644
index ef1e2ff..0000000
--- a/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.comm.MasterClient;
-import org.apache.giraph.comm.aggregators.AggregatorUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.Progressable;
-import org.apache.log4j.Logger;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.Map;
-
-/** Handler for aggregators on master */
-public class MasterAggregatorHandler implements MasterAggregatorUsage,
-    Writable {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(MasterAggregatorHandler.class);
-  /**
-   * Map of aggregators.
-   * This map is used to store final aggregated values received from worker
-   * owners, and also to read and write values provided during master.compute.
-   */
-  private final Map<String, AggregatorWrapper<Writable>> aggregatorMap =
-      Maps.newHashMap();
-  /** Aggregator writer */
-  private final AggregatorWriter aggregatorWriter;
-  /** Progressable used to report progress */
-  private final Progressable progressable;
-
-  /**
-   * Constructor
-   *
-   * @param conf         Giraph configuration
-   * @param progressable Progressable used for reporting progress
-   */
-  public MasterAggregatorHandler(
-      ImmutableClassesGiraphConfiguration<?, ?, ?, ?> conf,
-      Progressable progressable) {
-    this.progressable = progressable;
-    aggregatorWriter = conf.createAggregatorWriter();
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
-    if (aggregator == null) {
-      return null;
-    } else {
-      return (A) aggregator.getPreviousAggregatedValue();
-    }
-  }
-
-  @Override
-  public <A extends Writable> void setAggregatedValue(String name, A value) {
-    AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name);
-    if (aggregator == null) {
-      throw new IllegalStateException(
-          "setAggregatedValue: Tried to set value of aggregator which wasn't" +
-              " registered " + name);
-    }
-    ((AggregatorWrapper<A>) aggregator).setCurrentAggregatedValue(value);
-  }
-
-  @Override
-  public <A extends Writable> boolean registerAggregator(String name,
-      Class<? extends Aggregator<A>> aggregatorClass) throws
-      InstantiationException, IllegalAccessException {
-    checkAggregatorName(name);
-    return registerAggregator(name, aggregatorClass, false) != null;
-  }
-
-  @Override
-  public <A extends Writable> boolean registerPersistentAggregator(String name,
-      Class<? extends Aggregator<A>> aggregatorClass) throws
-      InstantiationException, IllegalAccessException {
-    checkAggregatorName(name);
-    return registerAggregator(name, aggregatorClass, true) != null;
-  }
-
-  /**
-   * Make sure user doesn't use AggregatorUtils.SPECIAL_COUNT_AGGREGATOR as
-   * the name of aggregator. Throw an exception if he tries to use it.
-   *
-   * @param name Name of the aggregator to check.
-   */
-  private void checkAggregatorName(String name) {
-    if (name.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
-      throw new IllegalStateException("checkAggregatorName: " +
-          AggregatorUtils.SPECIAL_COUNT_AGGREGATOR +
-          " is not allowed for the name of aggregator");
-    }
-  }
-
-  /**
-   * Helper function for registering aggregators.
-   *
-   * @param name            Name of the aggregator
-   * @param aggregatorClass Class of the aggregator
-   * @param persistent      Whether aggregator is persistent or not
-   * @param <A>             Aggregated value type
-   * @return Newly registered aggregator or aggregator which was previously
-   *         created with selected name, if any
-   */
-  private <A extends Writable> AggregatorWrapper<A> registerAggregator
-  (String name, Class<? extends Aggregator<A>> aggregatorClass,
-      boolean persistent) throws InstantiationException,
-      IllegalAccessException {
-    AggregatorWrapper<A> aggregatorWrapper =
-        (AggregatorWrapper<A>) aggregatorMap.get(name);
-    if (aggregatorWrapper == null) {
-      aggregatorWrapper =
-          new AggregatorWrapper<A>(aggregatorClass, persistent);
-      aggregatorMap.put(name, (AggregatorWrapper<Writable>) aggregatorWrapper);
-    }
-    return aggregatorWrapper;
-  }
-
-  /**
-   * Prepare aggregators for current superstep
-   *
-   * @param masterClient IPC client on master
-   */
-  public void prepareSuperstep(MasterClient masterClient) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("prepareSuperstep: Start preapring aggregators");
-    }
-    // prepare aggregators for master compute
-    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
-      if (aggregator.isPersistent()) {
-        aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
-      }
-      aggregator.setPreviousAggregatedValue(
-          aggregator.getCurrentAggregatedValue());
-      aggregator.resetCurrentAggregator();
-      progressable.progress();
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("prepareSuperstep: Aggregators prepared");
-    }
-  }
-
-  /**
-   * Finalize aggregators for current superstep and share them with workers
-   *
-   * @param masterClient IPC client on master
-   */
-  public void finishSuperstep(MasterClient masterClient) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("finishSuperstep: Start finishing aggregators");
-    }
-    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
-      if (aggregator.isChanged()) {
-        // if master compute changed the value, use the one he chose
-        aggregator.setPreviousAggregatedValue(
-            aggregator.getCurrentAggregatedValue());
-        // reset aggregator for the next superstep
-        aggregator.resetCurrentAggregator();
-      }
-      progressable.progress();
-    }
-
-    // send aggregators to their owners
-    // TODO: if aggregator owner and it's value didn't change,
-    //       we don't need to resend it
-    try {
-      for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
-          aggregatorMap.entrySet()) {
-        masterClient.sendAggregator(entry.getKey(),
-            entry.getValue().getAggregatorClass(),
-            entry.getValue().getPreviousAggregatedValue());
-        progressable.progress();
-      }
-      masterClient.finishSendingAggregatedValues();
-    } catch (IOException e) {
-      throw new IllegalStateException("finishSuperstep: " +
-          "IOException occurred while sending aggregators", e);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("finishSuperstep: Aggregators finished");
-    }
-  }
-
-  /**
-   * Accept aggregated values sent by worker. Every aggregator will be sent
-   * only once, by its owner.
-   * We don't need to count the number of these requests because global
-   * superstep barrier will happen after workers ensure all requests of this
-   * type have been received and processed by master.
-   *
-   * @param aggregatedValuesInput Input in which aggregated values are
-   *                              written in the following format:
-   *                              number_of_aggregators
-   *                              name_1  value_1
-   *                              name_2  value_2
-   *                              ...
-   * @throws IOException
-   */
-  public void acceptAggregatedValues(
-      DataInput aggregatedValuesInput) throws IOException {
-    int numAggregators = aggregatedValuesInput.readInt();
-    for (int i = 0; i < numAggregators; i++) {
-      String aggregatorName = aggregatedValuesInput.readUTF();
-      AggregatorWrapper<Writable> aggregator =
-          aggregatorMap.get(aggregatorName);
-      if (aggregator == null) {
-        throw new IllegalStateException(
-            "acceptAggregatedValues: " +
-                "Master received aggregator which isn't registered: " +
-                aggregatorName);
-      }
-      Writable aggregatorValue = aggregator.createInitialValue();
-      aggregatorValue.readFields(aggregatedValuesInput);
-      aggregator.setCurrentAggregatedValue(aggregatorValue);
-      progressable.progress();
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("acceptAggregatedValues: Accepted one set with " +
-          numAggregators + " aggregated values");
-    }
-  }
-
-  /**
-   * Write aggregators to {@link AggregatorWriter}
-   *
-   * @param superstep      Superstep which just finished
-   * @param superstepState State of the superstep which just finished
-   */
-  public void writeAggregators(long superstep, SuperstepState superstepState) {
-    try {
-      Iterable<Map.Entry<String, Writable>> iter =
-          Iterables.transform(
-              aggregatorMap.entrySet(),
-              new Function<Map.Entry<String, AggregatorWrapper<Writable>>,
-                  Map.Entry<String, Writable>>() {
-                @Override
-                public Map.Entry<String, Writable> apply(
-                    Map.Entry<String, AggregatorWrapper<Writable>> entry) {
-                  progressable.progress();
-                  return new AbstractMap.SimpleEntry<String,
-                      Writable>(entry.getKey(),
-                      entry.getValue().getPreviousAggregatedValue());
-                }
-              });
-      aggregatorWriter.writeAggregator(iter,
-          (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ?
-              AggregatorWriter.LAST_SUPERSTEP : superstep);
-    } catch (IOException e) {
-      throw new IllegalStateException(
-          "coordinateSuperstep: IOException while " +
-              "writing aggregators data", e);
-    }
-  }
-
-  /**
-   * Initialize {@link AggregatorWriter}
-   *
-   * @param service BspService
-   */
-  public void initialize(BspService service) {
-    try {
-      aggregatorWriter.initialize(service.getContext(),
-          service.getApplicationAttempt());
-    } catch (IOException e) {
-      throw new IllegalStateException("initialize: " +
-          "Couldn't initialize aggregatorWriter", e);
-    }
-  }
-
-  /**
-   * Close {@link AggregatorWriter}
-   *
-   * @throws IOException
-   */
-  public void close() throws IOException {
-    aggregatorWriter.close();
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(aggregatorMap.size());
-    for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
-        aggregatorMap.entrySet()) {
-      out.writeUTF(entry.getKey());
-      out.writeUTF(entry.getValue().getAggregatorClass().getName());
-      out.writeBoolean(entry.getValue().isPersistent());
-      entry.getValue().getPreviousAggregatedValue().write(out);
-      progressable.progress();
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    aggregatorMap.clear();
-    int numAggregators = in.readInt();
-    try {
-      for (int i = 0; i < numAggregators; i++) {
-        String aggregatorName = in.readUTF();
-        String aggregatorClassName = in.readUTF();
-        boolean isPersistent = in.readBoolean();
-        AggregatorWrapper<Writable> aggregator = registerAggregator(
-            aggregatorName,
-            AggregatorUtils.getAggregatorClass(aggregatorClassName),
-            isPersistent);
-        Writable value = aggregator.createInitialValue();
-        value.readFields(in);
-        aggregator.setPreviousAggregatedValue(value);
-        progressable.progress();
-      }
-    } catch (InstantiationException e) {
-      throw new IllegalStateException("readFields: " +
-          "InstantiationException occurred", e);
-    } catch (IllegalAccessException e) {
-      throw new IllegalStateException("readFields: " +
-          "IllegalAccessException occurred", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java b/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java
deleted file mode 100644
index 6e6571b..0000000
--- a/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorUsage.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * Master compute can access and change aggregators through this interface
- */
-public interface MasterAggregatorUsage {
-  /**
-   * Register an aggregator in preSuperstep() and/or preApplication(). This
-   * aggregator will have its value reset at the end of each super step.
-   *
-   * @param name of aggregator
-   * @param aggregatorClass Class type of the aggregator
-   * @param <A> Aggregator type
-   * @return True iff aggregator wasn't already registered
-   */
-  <A extends Writable> boolean registerAggregator(String name,
-      Class<? extends Aggregator<A>> aggregatorClass) throws
-      InstantiationException, IllegalAccessException;
-
-  /**
-   * Register persistent aggregator in preSuperstep() and/or
-   * preApplication(). This aggregator will not reset value at the end of
-   * super step.
-   *
-   * @param name of aggregator
-   * @param aggregatorClass Class type of the aggregator
-   * @param <A> Aggregator type
-   * @return True iff aggregator wasn't already registered
-   */
-  <A extends Writable> boolean registerPersistentAggregator(String name,
-      Class<? extends Aggregator<A>> aggregatorClass) throws
-      InstantiationException, IllegalAccessException;
-
-  /**
-   * Get value of an aggregator.
-   *
-   * @param name Name of aggregator
-   * @param <A> Aggregated value
-   * @return Value of the aggregator
-   */
-  <A extends Writable> A getAggregatedValue(String name);
-
-  /**
-   * Sets value of an aggregator.
-   *
-   * @param name Name of aggregator
-   * @param value Value to set
-   * @param <A> Aggregated value
-   */
-  <A extends Writable> void setAggregatedValue(String name, A value);
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/graph/MasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/graph/MasterCompute.java b/giraph/src/main/java/org/apache/giraph/graph/MasterCompute.java
deleted file mode 100644
index 4641621..0000000
--- a/giraph/src/main/java/org/apache/giraph/graph/MasterCompute.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-/**
- * Interface for defining a master vertex that can perform centralized
- * computation between supersteps. This class will be instantiated on the
- * master node and will run every superstep before the workers do.
- *
- * Communication with the workers should be performed via aggregators. The
- * values of the aggregators are broadcast to the workers before
- * vertex.compute() is called and collected by the master before
- * master.compute() is called. This means aggregator values used by the workers
- * are consistent with aggregator values from the master from the same
- * superstep and aggregator used by the master are consistent with aggregator
- * values from the workers from the previous superstep. Note that the master
- * has to register its own aggregators (it does not call {@link WorkerContext}
- * functions), but it uses all aggregators by default, so useAggregator does
- * not have to be called.
- */
-@SuppressWarnings("rawtypes")
-public abstract class MasterCompute implements MasterAggregatorUsage, Writable,
-    ImmutableClassesGiraphConfigurable {
-  /** If true, do not do anymore computation on this vertex. */
-  private boolean halt = false;
-  /** Global graph state **/
-  private GraphState graphState;
-  /** Configuration */
-  private ImmutableClassesGiraphConfiguration conf;
-
-  /**
-   * Must be defined by user to specify what the master has to do.
-   */
-  public abstract void compute();
-
-  /**
-   * Initialize the MasterCompute class, this is the place to register
-   * aggregators.
-   */
-  public abstract void initialize() throws InstantiationException,
-    IllegalAccessException;
-
-  /**
-   * Retrieves the current superstep.
-   *
-   * @return Current superstep
-   */
-  public long getSuperstep() {
-    return getGraphState().getSuperstep();
-  }
-
-  /**
-   * Get the total (all workers) number of vertices that
-   * existed in the previous superstep.
-   *
-   * @return Total number of vertices (-1 if first superstep)
-   */
-  public long getTotalNumVertices() {
-    return getGraphState().getTotalNumVertices();
-  }
-
-  /**
-   * Get the total (all workers) number of edges that
-   * existed in the previous superstep.
-   *
-   * @return Total number of edges (-1 if first superstep)
-   */
-  public long getTotalNumEdges() {
-    return getGraphState().getTotalNumEdges();
-  }
-
-  /**
-   * After this is called, the computation will stop, even if there are
-   * still messages in the system or vertices that have not voted to halt.
-   */
-  public void haltComputation() {
-    halt = true;
-  }
-
-  /**
-   * Has the master halted?
-   *
-   * @return True if halted, false otherwise.
-   */
-  public boolean isHalted() {
-    return halt;
-  }
-
-  /**
-   * Get the graph state for all workers.
-   *
-   * @return Graph state for all workers
-   */
-  GraphState getGraphState() {
-    return graphState;
-  }
-
-  /**
-   * Set the graph state for all workers
-   *
-   * @param graphState Graph state for all workers
-   */
-  void setGraphState(GraphState graphState) {
-    this.graphState = graphState;
-  }
-
-  /**
-   * Get the mapper context
-   *
-   * @return Mapper context
-   */
-  public Mapper.Context getContext() {
-    return getGraphState().getContext();
-  }
-
-  @Override
-  public final <A extends Writable> boolean registerAggregator(
-    String name, Class<? extends Aggregator<A>> aggregatorClass)
-    throws InstantiationException, IllegalAccessException {
-    return getGraphState().getGraphMapper().getMasterAggregatorUsage().
-        registerAggregator(name, aggregatorClass);
-  }
-
-  @Override
-  public final <A extends Writable> boolean registerPersistentAggregator(
-      String name,
-      Class<? extends Aggregator<A>> aggregatorClass) throws
-      InstantiationException, IllegalAccessException {
-    return getGraphState().getGraphMapper().getMasterAggregatorUsage().
-        registerPersistentAggregator(name, aggregatorClass);
-  }
-
-  @Override
-  public <A extends Writable> A getAggregatedValue(String name) {
-    return getGraphState().getGraphMapper().getMasterAggregatorUsage().
-        <A>getAggregatedValue(name);
-  }
-
-  @Override
-  public <A extends Writable> void setAggregatedValue(String name, A value) {
-    getGraphState().getGraphMapper().getMasterAggregatorUsage().
-        setAggregatedValue(name, value);
-  }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(ImmutableClassesGiraphConfiguration conf) {
-    this.conf = conf;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java b/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java
deleted file mode 100644
index 50ad6aa..0000000
--- a/giraph/src/main/java/org/apache/giraph/graph/MasterInfo.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-/**
- * Information about the master that is sent to other workers.
- */
-public class MasterInfo extends TaskInfo {
-  /**
-   * Constructor
-   */
-  public MasterInfo() {
-  }
-
-  @Override
-  public String toString() {
-    return "Master(" + super.toString() + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java b/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
deleted file mode 100644
index dbded04..0000000
--- a/giraph/src/main/java/org/apache/giraph/graph/MasterThread.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.bsp.ApplicationState;
-import org.apache.giraph.bsp.CentralizedServiceMaster;
-import org.apache.giraph.bsp.SuperstepState;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.counters.GiraphTimers;
-import org.apache.giraph.metrics.GiraphMetrics;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-/**
- * Master thread that will coordinate the activities of the tasks.  It runs
- * on all task processes, however, will only execute its algorithm if it knows
- * it is the "leader" from ZooKeeper.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public class MasterThread<I extends WritableComparable, V extends Writable,
-    E extends Writable, M extends Writable> extends Thread {
-  /** Counter group name for the Giraph timers */
-  public static final String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers";
-  /** Class logger */
-  private static final Logger LOG = Logger.getLogger(MasterThread.class);
-  /** Reference to shared BspService */
-  private CentralizedServiceMaster<I, V, E, M> bspServiceMaster = null;
-  /** Context (for counters) */
-  private final Context context;
-  /** Use superstep counters? */
-  private final boolean superstepCounterOn;
-  /** Setup seconds */
-  private double setupSecs = 0d;
-  /** Superstep timer (in seconds) map */
-  private final Map<Long, Double> superstepSecsMap =
-      new TreeMap<Long, Double>();
-
-  /**
-   * Constructor.
-   *
-   * @param bspServiceMaster Master that already exists and setup() has
-   *        been called.
-   * @param context Context from the Mapper.
-   */
-  MasterThread(BspServiceMaster<I, V, E, M> bspServiceMaster,
-      Context context) {
-    super(MasterThread.class.getName());
-    this.bspServiceMaster = bspServiceMaster;
-    this.context = context;
-    GiraphTimers.init(context);
-    superstepCounterOn = context.getConfiguration().getBoolean(
-        GiraphConstants.USE_SUPERSTEP_COUNTERS,
-        GiraphConstants.USE_SUPERSTEP_COUNTERS_DEFAULT);
-  }
-
-  /**
-   * The master algorithm.  The algorithm should be able to withstand
-   * failures and resume as necessary since the master may switch during a
-   * job.
-   */
-  @Override
-  public void run() {
-    // Algorithm:
-    // 1. Become the master
-    // 2. If desired, restart from a manual checkpoint
-    // 3. Run all supersteps until complete
-    try {
-      long startMillis = System.currentTimeMillis();
-      long endMillis = 0;
-      bspServiceMaster.setup();
-      if (bspServiceMaster.becomeMaster()) {
-        // Attempt to create InputSplits if necessary. Bail out if that fails.
-        if (bspServiceMaster.getRestartedSuperstep() !=
-            BspService.UNSET_SUPERSTEP ||
-            (bspServiceMaster.createVertexInputSplits() != -1 &&
-                bspServiceMaster.createEdgeInputSplits() != -1)) {
-          long setupMillis = System.currentTimeMillis() - startMillis;
-          GiraphTimers.getInstance().getSetupMs().increment(setupMillis);
-          setupSecs = setupMillis / 1000.0d;
-          SuperstepState superstepState = SuperstepState.INITIAL;
-          long cachedSuperstep = BspService.UNSET_SUPERSTEP;
-          while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) {
-            long startSuperstepMillis = System.currentTimeMillis();
-            cachedSuperstep = bspServiceMaster.getSuperstep();
-            GiraphMetrics.get().resetSuperstepMetrics(cachedSuperstep);
-            superstepState = bspServiceMaster.coordinateSuperstep();
-            long superstepMillis = System.currentTimeMillis() -
-                startSuperstepMillis;
-            superstepSecsMap.put(Long.valueOf(cachedSuperstep),
-                superstepMillis / 1000.0d);
-            if (LOG.isInfoEnabled()) {
-              LOG.info("masterThread: Coordination of superstep " +
-                  cachedSuperstep + " took " +
-                  superstepMillis / 1000.0d +
-                  " seconds ended with state " + superstepState +
-                  " and is now on superstep " +
-                  bspServiceMaster.getSuperstep());
-            }
-            if (superstepCounterOn) {
-              GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep).
-                increment(superstepMillis);
-            }
-
-            bspServiceMaster.postSuperstep();
-
-            // If a worker failed, restart from a known good superstep
-            if (superstepState == SuperstepState.WORKER_FAILURE) {
-              bspServiceMaster.restartFromCheckpoint(
-                  bspServiceMaster.getLastGoodCheckpoint());
-            }
-            endMillis = System.currentTimeMillis();
-          }
-          bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
-        }
-      }
-      bspServiceMaster.cleanup();
-      if (!superstepSecsMap.isEmpty()) {
-        GiraphTimers.getInstance().getShutdownMs().
-          increment(System.currentTimeMillis() - endMillis);
-        if (LOG.isInfoEnabled()) {
-          LOG.info("setup: Took " + setupSecs + " seconds.");
-        }
-        for (Entry<Long, Double> entry : superstepSecsMap.entrySet()) {
-          if (LOG.isInfoEnabled()) {
-            if (entry.getKey().longValue() ==
-                BspService.INPUT_SUPERSTEP) {
-              LOG.info("input superstep: Took " +
-                  entry.getValue() + " seconds.");
-            } else {
-              LOG.info("superstep " + entry.getKey() + ": Took " +
-                  entry.getValue() + " seconds.");
-            }
-          }
-          context.progress();
-        }
-        if (LOG.isInfoEnabled()) {
-          LOG.info("shutdown: Took " +
-              (System.currentTimeMillis() - endMillis) /
-              1000.0d + " seconds.");
-          LOG.info("total: Took " +
-              ((System.currentTimeMillis() - startMillis) /
-              1000.0d) + " seconds.");
-        }
-        GiraphTimers.getInstance().getTotalMs().
-          increment(System.currentTimeMillis() - startMillis);
-      }
-    } catch (IOException e) {
-      LOG.error("masterThread: Master algorithm failed with " +
-          "IOException ", e);
-      throw new IllegalStateException(e);
-    } catch (InterruptedException e) {
-      LOG.error("masterThread: Master algorithm failed with " +
-          "InterruptedException", e);
-      throw new IllegalStateException(e);
-    } catch (KeeperException e) {
-      LOG.error("masterThread: Master algorithm failed with " +
-          "KeeperException", e);
-      throw new IllegalStateException(e);
-    }
-    bspServiceMaster.postApplication();
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/graph/MultiGraphEdgeListVertex.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/graph/MultiGraphEdgeListVertex.java b/giraph/src/main/java/org/apache/giraph/graph/MultiGraphEdgeListVertex.java
deleted file mode 100644
index 19d6e0c..0000000
--- a/giraph/src/main/java/org/apache/giraph/graph/MultiGraphEdgeListVertex.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.util.Iterator;
-
-/**
- * An edge-list backed vertex that allows for parallel edges.
- * This can be used not only to support mutable multigraphs,
- * but also to make mutations and edge-based input efficient without
- * resorting to a hash-map backed vertex.
- *
- * Note: removeEdge() here removes all edges pointing to the target vertex,
- * but returns only one of them (or null if there are no such edges).
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class MultiGraphEdgeListVertex<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends EdgeListVertexBase<I, V, E, M> {
-  @Override
-  public final boolean addEdge(Edge<I, E> edge) {
-    appendEdge(edge);
-    return true;
-  }
-
-  @Override
-  public int removeEdges(I targetVertexId) {
-    int removedCount = 0;
-    for (Iterator<Edge<I, E>> edges = getEdges().iterator(); edges.hasNext();) {
-      Edge<I, E> edge = edges.next();
-      if (edge.getTargetVertexId().equals(targetVertexId)) {
-        ++removedCount;
-        edges.remove();
-      }
-    }
-    return removedCount;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/graph/MultiGraphRepresentativeVertex.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/graph/MultiGraphRepresentativeVertex.java b/giraph/src/main/java/org/apache/giraph/graph/MultiGraphRepresentativeVertex.java
deleted file mode 100644
index 40e929c..0000000
--- a/giraph/src/main/java/org/apache/giraph/graph/MultiGraphRepresentativeVertex.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Similar to {@link RepresentativeVertex}, but allows for parallel edges.
- *
- * Note:  removeEdge() here removes all edges pointing to the target vertex,
- * but returns only one of them (or null if there are no such edges).
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class MultiGraphRepresentativeVertex<I extends
-    WritableComparable, V extends Writable, E extends Writable,
-    M extends Writable> extends RepresentativeVertexBase<I, V, E, M> {
-  @Override
-  public final boolean addEdge(Edge<I, E> edge) {
-    appendEdge(edge);
-    return true;
-  }
-
-  @Override
-  public final int removeEdges(I targetVertexId) {
-    return removeAllEdges(targetVertexId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/graph/MutableVertex.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/graph/MutableVertex.java b/giraph/src/main/java/org/apache/giraph/graph/MutableVertex.java
deleted file mode 100644
index 04c17ed..0000000
--- a/giraph/src/main/java/org/apache/giraph/graph/MutableVertex.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.IOException;
-import java.util.Collections;
-
-/**
- * Interface used by VertexReader to set the properties of a new vertex
- * or mutate the graph.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public abstract class MutableVertex<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends Vertex<I, V, E, M> {
-  /**
-   * Add an edge for this vertex (happens immediately)
-   *
-   * @param edge Edge to add
-   * @return Return true if succeeded, false otherwise
-   */
-  public abstract boolean addEdge(Edge<I, E> edge);
-
-  /**
-   * Removes all edges pointing to the given vertex id.
-   *
-   * @param targetVertexId the target vertex id
-   * @return the number of removed edges
-   */
-  public abstract int removeEdges(I targetVertexId);
-
-  /**
-   * Sends a request to create a vertex that will be available during the
-   * next superstep.
-   *
-   * @param id Vertex id
-   * @param value Vertex value
-   * @param edges Initial edges
-   */
-  public void addVertexRequest(I id, V value, Iterable<Edge<I, E>> edges)
-    throws IOException {
-    Vertex<I, V, E, M> vertex = getConf().createVertex();
-    vertex.initialize(id, value, edges);
-    getGraphState().getWorkerClientRequestProcessor().addVertexRequest(vertex);
-  }
-
-  /**
-   * Sends a request to create a vertex that will be available during the
-   * next superstep.
-   *
-   * @param id Vertex id
-   * @param value Vertex value
-   */
-  public void addVertexRequest(I id, V value) throws IOException {
-    addVertexRequest(id, value, Collections.<Edge<I, E>>emptyList());
-  }
-
-  /**
-   * Request to remove a vertex from the graph
-   * (applied just prior to the next superstep).
-   *
-   * @param vertexId Id of the vertex to be removed.
-   */
-  public void removeVertexRequest(I vertexId) throws IOException {
-    getGraphState().getWorkerClientRequestProcessor().
-        removeVertexRequest(vertexId);
-  }
-
-  /**
-   * Request to add an edge of a vertex in the graph
-   * (processed just prior to the next superstep)
-   *
-   * @param sourceVertexId Source vertex id of edge
-   * @param edge Edge to add
-   */
-  public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
-    throws IOException {
-    getGraphState().getWorkerClientRequestProcessor().
-        addEdgeRequest(sourceVertexId, edge);
-  }
-
-  /**
-   * Request to remove all edges from a given source vertex to a given target
-   * vertex (processed just prior to the next superstep).
-   *
-   * @param sourceVertexId Source vertex id
-   * @param targetVertexId Target vertex id
-   */
-  public void removeEdgesRequest(I sourceVertexId, I targetVertexId)
-    throws IOException {
-    getGraphState().getWorkerClientRequestProcessor().
-        removeEdgesRequest(sourceVertexId, targetVertexId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java b/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java
deleted file mode 100644
index 673b402..0000000
--- a/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertex.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-/**
- * This vertex should only be used in conjunction with ByteArrayPartition since
- * it has special code to deserialize by reusing objects and not instantiating
- * new ones.  If used without ByteArrayPartition, it will cause a lot of
- * wasted memory.
- *
- * Also, this vertex is optimized for space and not efficient for either adding
- * or random access of edges.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class RepresentativeVertex<
-    I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends RepresentativeVertexBase<I, V, E, M> {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(RepresentativeVertex.class);
-
-  @Override
-  public final boolean addEdge(Edge<I, E> edge) {
-    // Note that this is very expensive (deserializes all edges
-    // in an addEdge() request).
-    // Hopefully the user set all the edges in setEdges().
-    for (Edge<I, E> currentEdge : getEdges()) {
-      if (currentEdge.getTargetVertexId().equals(edge.getTargetVertexId())) {
-        LOG.warn("addEdge: Vertex=" + getId() +
-            ": already added an edge value for target vertex id " +
-            edge.getTargetVertexId());
-        return false;
-      }
-    }
-    appendEdge(edge);
-    return true;
-  }
-
-  @Override
-  public final int removeEdges(I targetVertexId) {
-    return removeFirstEdge(targetVertexId) ? 1 : 0;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertexBase.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertexBase.java b/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertexBase.java
deleted file mode 100644
index d0e4bfb..0000000
--- a/giraph/src/main/java/org/apache/giraph/graph/RepresentativeVertexBase.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.giraph.utils.ExtendedDataInput;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Common base class for representative vertices.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-public abstract class RepresentativeVertexBase<
-    I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-    extends MutableVertex<I, V, E, M> implements Iterable<Edge<I, E>> {
-  /** Class logger */
-  private static final Logger LOG =
-      Logger.getLogger(RepresentativeVertex.class);
-  /** Representative edge */
-  private final Edge<I, E> representativeEdge = new Edge<I, E>();
-  /** Serialized edges */
-  private byte[] serializedEdges;
-  /** Byte used in serializedEdges */
-  private int serializedEdgesBytesUsed;
-  /** Number of edges */
-  private int edgeCount;
-
-  /**
-   * Append an edge to the serialized representation.
-   *
-   * @param edge Edge to append
-   */
-  protected void appendEdge(Edge<I, E> edge) {
-    ExtendedDataOutput extendedDataOutput =
-        getConf().createExtendedDataOutput(
-            serializedEdges, serializedEdgesBytesUsed);
-    try {
-      edge.getTargetVertexId().write(extendedDataOutput);
-      edge.getValue().write(extendedDataOutput);
-    } catch (IOException e) {
-      throw new IllegalStateException("addEdge: Failed to write to the " +
-          "new byte array");
-    }
-    serializedEdges = extendedDataOutput.getByteArray();
-    serializedEdgesBytesUsed = extendedDataOutput.getPos();
-    ++edgeCount;
-  }
-
-  /**
-   * Remove the first edge pointing to a target vertex.
-   *
-   * @param targetVertexId Target vertex id
-   * @return True if one such edge was found and removed.
-   */
-  protected boolean removeFirstEdge(I targetVertexId) {
-    // Note that this is very expensive (deserializes all edges
-    // in an removedge() request).
-    // Hopefully the user set all the edges correctly in setEdges().
-    RepresentativeEdgeIterator iterator = new RepresentativeEdgeIterator();
-    int foundStartOffset = 0;
-    while (iterator.hasNext()) {
-      Edge<I, E> edge = iterator.next();
-      if (edge.getTargetVertexId().equals(targetVertexId)) {
-        System.arraycopy(serializedEdges, iterator.extendedDataInput.getPos(),
-            serializedEdges, foundStartOffset,
-            serializedEdgesBytesUsed - iterator.extendedDataInput.getPos());
-        serializedEdgesBytesUsed -=
-            iterator.extendedDataInput.getPos() - foundStartOffset;
-        --edgeCount;
-        return true;
-      }
-      foundStartOffset = iterator.extendedDataInput.getPos();
-    }
-
-    return false;
-  }
-
-  /**
-   * Remove all edges pointing to a target vertex.
-   *
-   * @param targetVertexId Target vertex id
-   * @return The number of removed edges
-   */
-  protected int removeAllEdges(I targetVertexId) {
-    // Note that this is very expensive (deserializes all edges
-    // in an removedge() request).
-    // Hopefully the user set all the edges correctly in setEdges().
-    RepresentativeEdgeIterator iterator = new RepresentativeEdgeIterator();
-    int removedCount = 0;
-    List<Integer> foundStartOffsets = new LinkedList<Integer>();
-    List<Integer> foundEndOffsets = new LinkedList<Integer>();
-    int lastStartOffset = 0;
-    while (iterator.hasNext()) {
-      Edge<I, E> edge = iterator.next();
-      if (edge.getTargetVertexId().equals(targetVertexId)) {
-        foundStartOffsets.add(lastStartOffset);
-        foundEndOffsets.add(iterator.extendedDataInput.getPos());
-        ++removedCount;
-      }
-      lastStartOffset = iterator.extendedDataInput.getPos();
-    }
-    foundStartOffsets.add(serializedEdgesBytesUsed);
-
-    Iterator<Integer> foundStartOffsetIter = foundStartOffsets.iterator();
-    Integer foundStartOffset = foundStartOffsetIter.next();
-    for (Integer foundEndOffset : foundEndOffsets) {
-      Integer nextFoundStartOffset = foundStartOffsetIter.next();
-      System.arraycopy(serializedEdges, foundEndOffset,
-          serializedEdges, foundStartOffset,
-          nextFoundStartOffset - foundEndOffset);
-      serializedEdgesBytesUsed -= foundEndOffset - foundStartOffset;
-      foundStartOffset = nextFoundStartOffset;
-    }
-
-    edgeCount -= removedCount;
-    return removedCount;
-  }
-
-  @Override
-  public final void initialize(I id, V value, Iterable<Edge<I, E>> edges) {
-    // Make sure the initial values exist
-    representativeEdge.setTargetVertexId(getConf().createVertexId());
-    representativeEdge.setValue(getConf().createEdgeValue());
-    super.initialize(id, value, edges);
-  }
-
-  @Override
-  public final void initialize(I id, V value) {
-    // Make sure the initial values exist
-    representativeEdge.setTargetVertexId(getConf().createVertexId());
-    representativeEdge.setValue(getConf().createEdgeValue());
-    super.initialize(id, value);
-  }
-
-  /**
-   * Iterator that uses the representative edge (only one iterator allowed
-   * at a time)
-   */
-  private final class RepresentativeEdgeIterator implements
-      Iterator<Edge<I, E>> {
-    /** Input for processing the bytes */
-    private final ExtendedDataInput extendedDataInput;
-
-    /** Constructor. */
-    RepresentativeEdgeIterator() {
-      extendedDataInput = getConf().createExtendedDataInput(
-          serializedEdges, 0, serializedEdgesBytesUsed);
-    }
-    @Override
-    public boolean hasNext() {
-      return serializedEdges != null && extendedDataInput.available() > 0;
-    }
-
-    @Override
-    public Edge<I, E> next() {
-      try {
-        representativeEdge.getTargetVertexId().readFields(extendedDataInput);
-        representativeEdge.getValue().readFields(extendedDataInput);
-      } catch (IOException e) {
-        throw new IllegalStateException("next: Failed on pos " +
-            extendedDataInput.getPos() + " edge " + representativeEdge);
-      }
-      return representativeEdge;
-    }
-
-    @Override
-    public void remove() {
-      throw new IllegalAccessError("remove: Not supported");
-    }
-  }
-
-  @Override
-  public final Iterator<Edge<I, E>> iterator() {
-    return new RepresentativeEdgeIterator();
-  }
-
-  @Override
-  public final void setEdges(Iterable<Edge<I, E>> edges) {
-    ExtendedDataOutput extendedOutputStream =
-        getConf().createExtendedDataOutput();
-    if (edges != null) {
-      for (Edge<I, E> edge : edges) {
-        try {
-          edge.getTargetVertexId().write(extendedOutputStream);
-          edge.getValue().write(extendedOutputStream);
-        } catch (IOException e) {
-          throw new IllegalStateException("setEdges: Failed to serialize " +
-              edge);
-        }
-        ++edgeCount;
-      }
-    }
-    serializedEdges = extendedOutputStream.getByteArray();
-    serializedEdgesBytesUsed = extendedOutputStream.getPos();
-  }
-
-  @Override
-  public final Iterable<Edge<I, E>> getEdges() {
-    return this;
-  }
-
-  @Override
-  public final int getNumEdges() {
-    return edgeCount;
-  }
-
-  @Override
-  public final void readFields(DataInput in) throws IOException {
-    // Ensure these objects are present
-    if (representativeEdge.getTargetVertexId() == null) {
-      representativeEdge.setTargetVertexId(getConf().createVertexId());
-    }
-
-    if (representativeEdge.getValue() == null) {
-      representativeEdge.setValue(getConf().createEdgeValue());
-    }
-
-    I vertexId = getId();
-    if (vertexId == null) {
-      vertexId = getConf().createVertexId();
-    }
-    vertexId.readFields(in);
-
-    V vertexValue = getValue();
-    if (vertexValue == null) {
-      vertexValue = getConf().createVertexValue();
-    }
-    vertexValue.readFields(in);
-
-    initialize(vertexId, vertexValue);
-
-    serializedEdgesBytesUsed = in.readInt();
-    // Only create a new buffer if the old one isn't big enough
-    if (serializedEdges == null ||
-        serializedEdgesBytesUsed > serializedEdges.length) {
-      serializedEdges = new byte[serializedEdgesBytesUsed];
-    }
-    in.readFully(serializedEdges, 0, serializedEdgesBytesUsed);
-    edgeCount = in.readInt();
-
-    readHaltBoolean(in);
-  }
-
-  @Override
-  public final void write(DataOutput out) throws IOException {
-    getId().write(out);
-    getValue().write(out);
-
-    out.writeInt(serializedEdgesBytesUsed);
-    out.write(serializedEdges, 0, serializedEdgesBytesUsed);
-    out.writeInt(edgeCount);
-
-    out.writeBoolean(isHalted());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java b/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
deleted file mode 100644
index 4843dd5..0000000
--- a/giraph/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.utils.EdgeIterables;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Mutable vertex with no edge values.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <M> Message data
- */
-public abstract class SimpleMutableVertex<I extends WritableComparable,
-    V extends Writable, M extends Writable> extends MutableVertex<I, V,
-    NullWritable, M> {
-  /**
-   * Set the neighbors of this vertex.
-   *
-   * @param neighbors Iterable of destination vertex ids.
-   */
-  public abstract void setNeighbors(Iterable<I> neighbors);
-
-  @Override
-  public void setEdges(Iterable<Edge<I, NullWritable>> edges) {
-    setNeighbors(EdgeIterables.getNeighbors(edges));
-  }
-
-  /**
-   * Get a read-only view of the neighbors of this
-   * vertex, i.e. the target vertices of its out-edges.
-   *
-   * @return the neighbors (sort order determined by subclass implementation).
-   */
-  public abstract Iterable<I> getNeighbors();
-
-  @Override
-  public Iterable<Edge<I, NullWritable>> getEdges() {
-    return EdgeIterables.getEdges(getNeighbors());
-  }
-
-  @Override
-  public NullWritable getEdgeValue(I targetVertexId) {
-    return NullWritable.get();
-  }
-
-  /**
-   * Add an edge for this vertex (happens immediately)
-   *
-   * @param targetVertexId target vertex
-   * @return Return true if succeeded, false otherwise
-   */
-  public abstract boolean addEdge(I targetVertexId);
-
-  @Override
-  public boolean addEdge(Edge<I, NullWritable> edge) {
-    return addEdge(edge.getTargetVertexId());
-  }
-
-  /**
-   * Request to add an edge of a vertex in the graph
-   * (processed just prior to the next superstep)
-   *
-   * @param sourceVertexId Source vertex id of edge
-   */
-  public void addEdgeRequest(I sourceVertexId) throws IOException {
-    getGraphState().getWorkerClientRequestProcessor().
-        addEdgeRequest(sourceVertexId, new Edge<I,
-            NullWritable>(sourceVertexId, NullWritable.get()));
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    I vertexId = (I) getConf().createVertexId();
-    vertexId.readFields(in);
-    V vertexValue = (V) getConf().createVertexValue();
-    vertexValue.readFields(in);
-
-    int numEdges = in.readInt();
-    List<Edge<I, NullWritable>> edges =
-        Lists.newArrayListWithCapacity(numEdges);
-    for (int i = 0; i < numEdges; ++i) {
-      I targetVertexId = (I) getConf().createVertexId();
-      targetVertexId.readFields(in);
-      edges.add(new Edge<I, NullWritable>(targetVertexId, NullWritable.get()));
-    }
-
-    initialize(vertexId, vertexValue, edges);
-
-    readHaltBoolean(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    getId().write(out);
-    getValue().write(out);
-
-    out.writeInt(getNumEdges());
-    for (I neighbor : getNeighbors()) {
-      neighbor.write(out);
-    }
-
-    out.writeBoolean(isHalted());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/graph/SimpleVertex.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/graph/SimpleVertex.java b/giraph/src/main/java/org/apache/giraph/graph/SimpleVertex.java
deleted file mode 100644
index 0d56d95..0000000
--- a/giraph/src/main/java/org/apache/giraph/graph/SimpleVertex.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.utils.EdgeIterables;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Vertex with no edge values.
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <M> Message data
- */
-public abstract class SimpleVertex<I extends WritableComparable,
-    V extends Writable, M extends Writable> extends Vertex<I, V,
-    NullWritable, M> {
-  /**
-   * Set the neighbors of this vertex.
-   *
-   * @param neighbors Iterable of destination vertex ids.
-   */
-  public abstract void setNeighbors(Iterable<I> neighbors);
-
-  @Override
-  public void setEdges(Iterable<Edge<I, NullWritable>> edges) {
-    setNeighbors(EdgeIterables.getNeighbors(edges));
-  }
-
-  /**
-   * Get a read-only view of the neighbors of this
-   * vertex, i.e. the target vertices of its out-edges.
-   *
-   * @return the neighbors (sort order determined by subclass implementation).
-   */
-  public abstract Iterable<I> getNeighbors();
-
-  @Override
-  public Iterable<Edge<I, NullWritable>> getEdges() {
-    return EdgeIterables.getEdges(getNeighbors());
-  }
-
-  @Override
-  public NullWritable getEdgeValue(I targetVertexId) {
-    return NullWritable.get();
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    I vertexId = getConf().createVertexId();
-    vertexId.readFields(in);
-    V vertexValue = getConf().createVertexValue();
-    vertexValue.readFields(in);
-
-    int numEdges = in.readInt();
-    List<I> neighbors = Lists.newArrayListWithCapacity(numEdges);
-    for (int i = 0; i < numEdges; ++i) {
-      I targetVertexId = getConf().createVertexId();
-      targetVertexId.readFields(in);
-      neighbors.add(targetVertexId);
-    }
-
-    initialize(vertexId, vertexValue);
-    setNeighbors(neighbors);
-
-    readHaltBoolean(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    getId().write(out);
-    getValue().write(out);
-
-    out.writeInt(getNumEdges());
-    for (I neighbor : getNeighbors()) {
-      neighbor.write(out);
-    }
-
-    out.writeBoolean(isHalted());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java b/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java
deleted file mode 100644
index 536af0f..0000000
--- a/giraph/src/main/java/org/apache/giraph/graph/TaskInfo.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import org.apache.hadoop.io.Writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-/**
- * Abstract class for information about any task - worker or master.
- */
-public abstract class TaskInfo implements Writable {
-  /** Task hostname */
-  private String hostname;
-  /** Port that the IPC server is using */
-  private int port;
-  /** Task partition id */
-  private int taskId = -1;
-
-  /**
-   * Constructor
-   */
-  public TaskInfo() {
-  }
-
-  /**
-   * Get this task's hostname
-   *
-   * @return Hostname
-   */
-  public String getHostname() {
-    return hostname;
-  }
-
-  /**
-   * Get port that the IPC server of this task is using
-   *
-   * @return Port
-   */
-  public int getPort() {
-    return port;
-  }
-
-  /**
-   * Set address that the IPC server of this task is using
-   *
-   * @param address Address
-   */
-  public void setInetSocketAddress(InetSocketAddress address) {
-    this.port = address.getPort();
-    this.hostname = address.getHostName();
-  }
-
-  /**
-   * Get a new instance of the InetSocketAddress for this hostname and port
-   *
-   * @return InetSocketAddress of the hostname and port.
-   */
-  public InetSocketAddress getInetSocketAddress() {
-    return new InetSocketAddress(hostname, port);
-  }
-
-  /**
-   * Set task partition id of this task
-   *
-   * @param taskId partition id
-   */
-  public void setTaskId(int taskId) {
-    this.taskId = taskId;
-  }
-
-  /**
-   * Get task partition id of this task
-   *
-   * @return Task partition id of this task
-   */
-  public int getTaskId() {
-    return taskId;
-  }
-
-  /**
-   * Get hostname and task id
-   *
-   * @return Hostname and task id
-   */
-  public String getHostnameId() {
-    return getHostname() + "_" + getTaskId();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other instanceof TaskInfo) {
-      TaskInfo taskInfo = (TaskInfo) other;
-      if (hostname.equals(taskInfo.getHostname()) &&
-          (getTaskId() == taskInfo.getTaskId()) &&
-          (port == taskInfo.getPort() &&
-          (taskId == taskInfo.getTaskId()))) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public String toString() {
-    return "hostname=" + getHostname() +
-        ", MRtaskID=" + getTaskId() +
-        ", port=" + getPort();
-  }
-
-  @Override
-  public void readFields(DataInput input) throws IOException {
-    hostname = input.readUTF();
-    port = input.readInt();
-    taskId = input.readInt();
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    output.writeUTF(hostname);
-    output.writeInt(port);
-    output.writeInt(taskId);
-  }
-
-  @Override
-  public int hashCode() {
-    int result = 17;
-    result = 37 * result + getPort();
-    result = 37 * result + hostname.hashCode();
-    result = 37 * result + getTaskId();
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
----------------------------------------------------------------------
diff --git a/giraph/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java b/giraph/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
deleted file mode 100644
index abdba44..0000000
--- a/giraph/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph;
-
-import java.io.IOException;
-import java.util.Map.Entry;
-
-import com.google.common.base.Charsets;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-
-/**
- * Default implementation of {@link AggregatorWriter}. Each line consists of
- * text and contains the aggregator name, the aggregator value and the
- * aggregator class.
- */
-public class TextAggregatorWriter implements AggregatorWriter {
-  /** The filename of the outputfile */
-  public static final String FILENAME =
-      "giraph.textAggregatorWriter.filename";
-  /** Signal for "never write" frequency */
-  public static final int NEVER = 0;
-  /** Signal for "write only the final values" frequency */
-  public static final int AT_THE_END = -1;
-  /** Signal for "write values in every superstep" frequency */
-  public static final int ALWAYS = -1;
-  /** The frequency of writing:
-   *  - NEVER: never write, files aren't created at all
-   *  - AT_THE_END: aggregators are written only when the computation is over
-   *  - int: i.e. 1 is every superstep, 2 every two supersteps and so on
-   */
-  public static final String FREQUENCY =
-      "giraph.textAggregatorWriter.frequency";
-  /** Default filename for dumping aggregator values */
-  private static final String DEFAULT_FILENAME = "aggregatorValues";
-  /** Handle to the outputfile */
-  protected FSDataOutputStream output;
-  /** Write every "frequency" supersteps */
-  private int frequency;
-
-  @Override
-  @SuppressWarnings("rawtypes")
-  public void initialize(Context context, long attempt) throws IOException {
-    Configuration conf = context.getConfiguration();
-    frequency = conf.getInt(FREQUENCY, NEVER);
-    String filename  = conf.get(FILENAME, DEFAULT_FILENAME);
-    if (frequency != NEVER) {
-      Path p = new Path(filename + "_" + attempt);
-      FileSystem fs = FileSystem.get(conf);
-      if (fs.exists(p)) {
-        throw new RuntimeException("aggregatorWriter file already" +
-            " exists: " + p.getName());
-      }
-      output = fs.create(p);
-    }
-  }
-
-  @Override
-  public void writeAggregator(
-      Iterable<Entry<String, Writable>> aggregatorMap,
-      long superstep) throws IOException {
-    if (shouldWrite(superstep)) {
-      for (Entry<String, Writable> entry : aggregatorMap) {
-        byte[] bytes = aggregatorToString(entry.getKey(), entry.getValue(),
-            superstep).getBytes(Charsets.UTF_8);
-        output.write(bytes, 0, bytes.length);
-      }
-      output.flush();
-    }
-  }
-
-  /**
-   * Implements the way an aggregator is converted into a String.
-   * Override this if you want to implement your own text format.
-   *
-   * @param aggregatorName Name of the aggregator
-   * @param value Value of aggregator
-   * @param superstep Current superstep
-   * @return The String representation for the aggregator
-   */
-  protected String aggregatorToString(String aggregatorName,
-      Writable value,
-      long superstep) {
-    return new StringBuilder("superstep=").append(superstep).append("\t")
-        .append(aggregatorName).append("=").append(value).append("\n")
-        .toString();
-  }
-
-  /**
-   * Should write this superstep?
-   *
-   * @param superstep Superstep to check
-   * @return True if should write, false otherwise
-   */
-  private boolean shouldWrite(long superstep) {
-    return (frequency == AT_THE_END && superstep == LAST_SUPERSTEP) ||
-        (frequency != NEVER && superstep % frequency == 0);
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (output != null) {
-      output.close();
-    }
-  }
-}


Mime
View raw message