giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [23/51] [partial] GIRAPH-457: update module names (nitay)
Date Thu, 20 Dec 2012 04:25:32 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
new file mode 100644
index 0000000..dea4229
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayVertexIdMessages.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Stores vertex id and message pairs in a single byte array
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public class ByteArrayVertexIdMessages<I extends WritableComparable,
+    M extends Writable> implements Writable,
+    ImmutableClassesGiraphConfigurable {
+  /** Extended data output */
+  private ExtendedDataOutput extendedDataOutput;
+  /** Configuration */
+  private ImmutableClassesGiraphConfiguration<I, ?, ?, M> configuration;
+  /** Add the message size to the stream? (Depends on the message store) */
+  private boolean useMessageSizeEncoding = false;
+
+  /**
+   * Constructor for reflection
+   */
+  public ByteArrayVertexIdMessages() { }
+
+  /**
+   * Set whether message sizes should be encoded.  This should only be a
+   * possibility when not combining.  When combining, all messages need to be
+   * deserializd right away, so this won't help.
+   */
+  private void setUseMessageSizeEncoding() {
+    if (!configuration.useCombiner()) {
+      useMessageSizeEncoding = configuration.useMessageSizeEncoding();
+    } else {
+      useMessageSizeEncoding = false;
+    }
+  }
+
+  /**
+   * Initialize the inner state. Must be called before {@code add()} is
+   * called.
+   */
+  public void initialize() {
+    extendedDataOutput = configuration.createExtendedDataOutput();
+    setUseMessageSizeEncoding();
+  }
+
+  /**
+   * Initialize the inner state, with a known size. Must be called before
+   * {@code add()} is called.
+   *
+   * @param expectedSize Number of bytes to be expected
+   */
+  public void initialize(int expectedSize) {
+    extendedDataOutput = configuration.createExtendedDataOutput(expectedSize);
+    setUseMessageSizeEncoding();
+  }
+
+  /**
+   * Add a vertex id and message pair to the collection.
+   *
+   * @param vertexId Vertex id
+   * @param message Message
+   */
+  public void add(I vertexId, M message) {
+    try {
+      vertexId.write(extendedDataOutput);
+      // Write the size if configured this way, else, just the message
+      if (useMessageSizeEncoding) {
+        int pos = extendedDataOutput.getPos();
+        extendedDataOutput.skipBytes(4);
+        message.write(extendedDataOutput);
+        extendedDataOutput.writeInt(
+            pos, extendedDataOutput.getPos() - pos - 4);
+      } else {
+        message.write(extendedDataOutput);
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("add: IOException", e);
+    }
+  }
+
+  /**
+   * Get the number of bytes used
+   *
+   * @return Number of bytes used
+   */
+  public int getSize() {
+    return extendedDataOutput.getPos();
+  }
+
+  /**
+   * Check if the list is empty.
+   *
+   * @return True iff there are no pairs in the list
+   */
+  public boolean isEmpty() {
+    return extendedDataOutput.getPos() == 0;
+  }
+
+  /**
+   * Clear the collection.
+   */
+  public void clear() {
+    extendedDataOutput.reset();
+  }
+
+  /**
+   * Get specialized iterator that will instiantiate the vertex id and
+   * message of this object.
+   *
+   * @return Special iterator that reuses vertex ids and messages unless
+   *         specified
+   */
+  public VertexIdMessageIterator getVertexIdMessageIterator() {
+    return new VertexIdMessageIterator();
+  }
+
+  /**
+   * Get specialized iterator that will instiantiate the vertex id and
+   * message of this object.  It will only produce message bytes, not actual
+   * messages and expects a different encoding.
+   *
+   * @return Special iterator that reuses vertex ids (unless released) and
+   *         copies message bytes
+   */
+  public VertexIdMessageBytesIterator getVertexIdMessageBytesIterator() {
+    if (!useMessageSizeEncoding) {
+      return null;
+    }
+    return new VertexIdMessageBytesIterator();
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return configuration;
+  }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    dataOutput.writeBoolean(useMessageSizeEncoding);
+    dataOutput.writeInt(extendedDataOutput.getPos());
+    dataOutput.write(extendedDataOutput.getByteArray(), 0,
+        extendedDataOutput.getPos());
+  }
+
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    useMessageSizeEncoding = dataInput.readBoolean();
+    int size = dataInput.readInt();
+    byte[] buf = new byte[size];
+    dataInput.readFully(buf);
+    extendedDataOutput = configuration.createExtendedDataOutput(buf, size);
+  }
+
+  /**
+   * Get the size of this object in serialized form.
+   *
+   * @return The size (in bytes) of serialized object
+   */
+  public int getSerializedSize() {
+    return 1 + 4 + getSize();
+  }
+
+  /**
+   * Common implementation for VertexIdMessageIterator
+   * and VertexIdMessageBytesIterator
+   */
+  public abstract class VertexIdIterator {
+    /** Reader of the serialized messages */
+    protected final ExtendedDataInput extendedDataInput =
+        configuration.createExtendedDataInput(
+            extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
+    /** Current vertex id */
+    protected I vertexId;
+
+    /**
+     * Returns true if the iteration has more elements.
+     *
+     * @return True if the iteration has more elements.
+     */
+    public boolean hasNext() {
+      return extendedDataInput.available() > 0;
+    }
+    /**
+     * Moves to the next element in the iteration.
+     */
+    public abstract void next();
+
+    /**
+     * Get the current vertex id.  Ihis object's contents are only guaranteed
+     * until next() is called.  To take ownership of this object call
+     * releaseCurrentVertexId() after getting a reference to this object.
+     *
+     * @return Current vertex id
+     */
+    public I getCurrentVertexId() {
+      return vertexId;
+    }
+    /**
+     * The backing store of the current vertex id is now released.
+     * Further calls to getCurrentVertexId () without calling next()
+     * will return null.
+     *
+     * @return Current vertex id that was released
+     */
+    public I releaseCurrentVertexId() {
+      I releasedVertexId = vertexId;
+      vertexId = null;
+      return releasedVertexId;
+    }
+  }
+
+  /**
+   * Special iterator that reuses vertex ids and messages so that the
+   * lifetime of the object is only until next() is called.
+   *
+   * Vertex id ownership can be released if desired through
+   * releaseCurrentVertexId().  This optimization allows us to cut down
+   * on the number of objects instantiated and garbage collected.
+   *
+   * Not thread-safe.
+   */
+  public class VertexIdMessageIterator extends VertexIdIterator {
+    /** Current message */
+    private M message;
+
+    @Override
+    public void next() {
+      if (vertexId == null) {
+        vertexId = configuration.createVertexId();
+      }
+      if (message == null) {
+        message = configuration.createMessageValue();
+      }
+      try {
+        vertexId.readFields(extendedDataInput);
+        message.readFields(extendedDataInput);
+      } catch (IOException e) {
+        throw new IllegalStateException("next: IOException", e);
+      }
+    }
+
+    /**
+     * Get the current message
+     *
+     * @return Current message
+     */
+    public M getCurrentMessage() {
+      return message;
+    }
+  }
+
+  /**
+   * Special iterator that reuses vertex ids and messages bytes so that the
+   * lifetime of the object is only until next() is called.
+   *
+   * Vertex id ownership can be released if desired through
+   * releaseCurrentVertexId().  This optimization allows us to cut down
+   * on the number of objects instantiated and garbage collected.  Messages
+   * can only be copied to an ExtendedDataOutput object
+   *
+   * Not thread-safe.
+   */
+  public class VertexIdMessageBytesIterator extends VertexIdIterator {
+    /** Last message offset */
+    private int messageOffset = -1;
+    /** Number of bytes in the last message */
+    private int messageBytes = -1;
+
+    /**
+     * Moves to the next element in the iteration.
+     */
+    @Override
+    public void next() {
+      if (vertexId == null) {
+        vertexId = configuration.createVertexId();
+      }
+
+      try {
+        vertexId.readFields(extendedDataInput);
+        messageBytes = extendedDataInput.readInt();
+        messageOffset = extendedDataInput.getPos();
+        if (extendedDataInput.skipBytes(messageBytes) != messageBytes) {
+          throw new IllegalStateException("next: Failed to skip " +
+              messageBytes);
+        }
+      } catch (IOException e) {
+        throw new IllegalStateException("next: IOException", e);
+      }
+    }
+
+    /**
+     * Write the current message to an ExtendedDataOutput object
+     *
+     * @param dataOutput Where the current message will be written to
+     */
+    public void writeCurrentMessageBytes(
+        ExtendedDataOutput dataOutput) {
+      try {
+        dataOutput.write(
+            extendedDataOutput.getByteArray(), messageOffset, messageBytes);
+      } catch (IOException e) {
+        throw new IllegalStateException("writeCurrentMessageBytes: Got " +
+            "IOException", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/CollectionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/CollectionUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/CollectionUtils.java
new file mode 100644
index 0000000..83624a3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/CollectionUtils.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.log4j.Logger;
+
+/** Helper methods for Collections */
+public class CollectionUtils {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(CollectionUtils.class);
+
+  /** Do not instantiate. */
+  private CollectionUtils() { }
+
+  /**
+   * If map already has a value associated with the key it adds values to that
+   * value, otherwise it will put values to the map.  Do not reuse values.
+   *
+   * @param key    Key under which we are adding values
+   * @param values Values we want to add
+   * @param map    Map which we are adding values to
+   * @param <K> Key
+   * @param <V> Value
+   * @param <C> Collection
+   * @return New value associated with the key
+   */
+  public static <K, V, C extends Collection<V>> C addConcurrent(K key,
+      C values, ConcurrentMap<K, C> map) {
+    C currentValues = map.get(key);
+    if (currentValues == null) {
+      currentValues = map.putIfAbsent(key, values);
+      if (currentValues == null) {
+        return values;
+      }
+    }
+    synchronized (currentValues) {
+      currentValues.addAll(values);
+    }
+    return currentValues;
+  }
+
+  /**
+   * Helper method to check if iterables are equal.  Supports the case
+   * where the iterable next() returns a reused object.  We do assume that
+   * iterator() produces the objects in the same order across repeated calls,
+   * if the object doesn't change.   This is very expensive (n^2) and should
+   * be used for testing only.
+   *
+   * @param first First iterable
+   * @param second Second iterable
+   * @param <T> Type to compare
+   * @return True if equal, false otherwise
+   */
+  public static <T> boolean isEqual(Iterable<T> first, Iterable<T> second) {
+    // Relies on elements from the iterator arriving in the same order.
+    // For every element in first, check elements on the second iterable by
+    // marking the ones seen that have been found.  Then ensure that all
+    // the elements of the second have been seen as well.
+    int firstSize = Iterables.size(first);
+    int secondSize = Iterables.size(second);
+    boolean[] usedSecondArray = new boolean[secondSize];
+    Iterator<T> firstIterator = first.iterator();
+    while (firstIterator.hasNext()) {
+      T firstValue = firstIterator.next();
+      boolean foundFirstValue = false;
+      Iterator<T> secondIterator = second.iterator();
+      for (int i = 0; i < usedSecondArray.length; ++i) {
+        T secondValue = secondIterator.next();
+        if (!usedSecondArray[i]) {
+          if (firstValue.equals(secondValue)) {
+            usedSecondArray[i] = true;
+            foundFirstValue = true;
+            break;
+          }
+        }
+      }
+
+      if (!foundFirstValue) {
+        LOG.error("isEqual: Couldn't find element from first (" + firstValue +
+            ") in second " + second + "(size=" + secondSize + ")");
+        return false;
+      }
+    }
+
+    Iterator<T> secondIterator = second.iterator();
+    for (int i = 0; i < usedSecondArray.length; ++i) {
+      T secondValue = secondIterator.next();
+      if (!usedSecondArray[i]) {
+        LOG.error("isEqual: Element " + secondValue + " (index " + i +
+            ") in second " + second + "(size=" + secondSize +
+            ") not found in " + first + " (size=" + firstSize + ")");
+        return false;
+      }
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/ComparisonUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ComparisonUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ComparisonUtils.java
new file mode 100644
index 0000000..bb940ea
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ComparisonUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.Iterator;
+
+/** Simple helper class for comparisons and equality checking */
+public class ComparisonUtils {
+
+  /** Do not construct this object */
+  private ComparisonUtils() { }
+
+  /**
+   * Compare elements, sort order and length
+   *
+   * @param <T> Type of iterable to compare.
+   * @param first First iterable to compare.
+   * @param second Second iterable to compare.
+   * @return True if equal, false otherwise.
+   */
+  public static <T> boolean equal(Iterable<T> first, Iterable<T> second) {
+    return equal(first.iterator(), second.iterator());
+  }
+
+  /**
+   * Compare elements, sort order and length
+   *
+   * @param <T> Type of iterable to compare.
+   * @param first First iterable to compare.
+   * @param second Second iterable to compare.
+   * @return True if equal, false otherwise.
+   */
+  public static <T> boolean equal(Iterator<T> first, Iterator<T> second) {
+    while (first.hasNext() && second.hasNext()) {
+      T message = first.next();
+      T otherMessage = second.next();
+      /* element-wise equality */
+      if (!(message == null ? otherMessage == null :
+        message.equals(otherMessage))) {
+        return false;
+      }
+    }
+    /* length must also be equal */
+    return !(first.hasNext() || second.hasNext());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java b/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java
new file mode 100644
index 0000000..96e3fad
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import org.jboss.netty.buffer.DynamicChannelBuffer;
+
+/**
+ * Special input that reads from a DynamicChannelBuffer.
+ */
+public class DynamicChannelBufferInputStream implements DataInput {
+  /** Internal dynamic channel buffer */
+  private DynamicChannelBuffer buffer;
+
+  /**
+   * Constructor.
+   *
+   * @param buffer Buffer to read from
+   */
+  public DynamicChannelBufferInputStream(DynamicChannelBuffer buffer) {
+    this.buffer = buffer;
+  }
+
+  @Override
+  public void readFully(byte[] b) throws IOException {
+    buffer.readBytes(b);
+  }
+
+  @Override
+  public void readFully(byte[] b, int off, int len) throws IOException {
+    buffer.readBytes(b, off, len);
+  }
+
+  @Override
+  public int skipBytes(int n) throws IOException {
+    buffer.skipBytes(n);
+    return n;
+  }
+
+  @Override
+  public boolean readBoolean() throws IOException {
+    int ch = buffer.readByte();
+    if (ch < 0) {
+      throw new IllegalStateException("readBoolean: Got " + ch);
+    }
+    return ch != 0;
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    return buffer.readByte();
+  }
+
+  @Override
+  public int readUnsignedByte() throws IOException {
+    return buffer.readUnsignedByte();
+  }
+
+  @Override
+  public short readShort() throws IOException {
+    return buffer.readShort();
+  }
+
+  @Override
+  public int readUnsignedShort() throws IOException {
+    return buffer.readUnsignedShort();
+  }
+
+  @Override
+  public char readChar() throws IOException {
+    return buffer.readChar();
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    return buffer.readInt();
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    return buffer.readLong();
+  }
+
+  @Override
+  public float readFloat() throws IOException {
+    return buffer.readFloat();
+  }
+
+  @Override
+  public double readDouble() throws IOException {
+    return buffer.readDouble();
+  }
+
+  @Override
+  public String readLine() throws IOException {
+    // Note that this code is mostly copied from DataInputStream
+    char[] buf = new char[128];
+
+    int room = buf.length;
+    int offset = 0;
+    int c;
+
+  loop:
+    while (true) {
+      c = buffer.readByte();
+      switch (c) {
+      case -1:
+      case '\n':
+        break loop;
+      case '\r':
+        int c2 = buffer.readByte();
+        if ((c2 != '\n') && (c2 != -1)) {
+          buffer.readerIndex(buffer.readerIndex() - 1);
+        }
+        break loop;
+      default:
+        if (--room < 0) {
+          char[] replacebuf = new char[offset + 128];
+          room = replacebuf.length - offset - 1;
+          System.arraycopy(buf, 0, replacebuf, 0, offset);
+          buf = replacebuf;
+        }
+        buf[offset++] = (char) c;
+        break;
+      }
+    }
+    if ((c == -1) && (offset == 0)) {
+      return null;
+    }
+    return String.copyValueOf(buf, 0, offset);
+  }
+
+  @Override
+  public String readUTF() throws IOException {
+    // Note that this code is mostly copied from DataInputStream
+    int utflen = buffer.readUnsignedShort();
+
+    byte[] bytearr = new byte[utflen];
+    char[] chararr = new char[utflen];
+
+    int c;
+    int char2;
+    int char3;
+    int count = 0;
+    int chararrCount = 0;
+
+    buffer.readBytes(bytearr, 0, utflen);
+
+    while (count < utflen) {
+      c = (int) bytearr[count] & 0xff;
+      if (c > 127) {
+        break;
+      }
+      count++;
+      chararr[chararrCount++] = (char) c;
+    }
+
+    while (count < utflen) {
+      c = (int) bytearr[count] & 0xff;
+      switch (c >> 4) {
+      case 0:
+      case 1:
+      case 2:
+      case 3:
+      case 4:
+      case 5:
+      case 6:
+      case 7:
+        /* 0xxxxxxx */
+        count++;
+        chararr[chararrCount++] = (char) c;
+        break;
+      case 12:
+      case 13:
+        /* 110x xxxx   10xx xxxx*/
+        count += 2;
+        if (count > utflen) {
+          throw new UTFDataFormatException(
+              "malformed input: partial character at end");
+        }
+        char2 = (int) bytearr[count - 1];
+        if ((char2 & 0xC0) != 0x80) {
+          throw new UTFDataFormatException(
+                "malformed input around byte " + count);
+        }
+        chararr[chararrCount++] = (char) (((c & 0x1F) << 6) |
+            (char2 & 0x3F));
+        break;
+      case 14:
+        /* 1110 xxxx  10xx xxxx  10xx xxxx */
+        count += 3;
+        if (count > utflen) {
+          throw new UTFDataFormatException(
+              "malformed input: partial character at end");
+        }
+        char2 = (int) bytearr[count - 2];
+        char3 = (int) bytearr[count - 1];
+        if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+          throw new UTFDataFormatException(
+              "malformed input around byte " + (count - 1));
+        }
+        chararr[chararrCount++] = (char) (((c & 0x0F) << 12) |
+            ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+        break;
+      default:
+        /* 10xx xxxx,  1111 xxxx */
+        throw new UTFDataFormatException(
+            "malformed input around byte " + count);
+      }
+    }
+    // The number of chars produced may be less than utflen
+    return new String(chararr, 0, chararrCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java
new file mode 100644
index 0000000..ca4a7d7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.buffer.DirectChannelBufferFactory;
+import org.jboss.netty.buffer.DynamicChannelBuffer;
+
+/**
+ * Special output stream that can grow as needed and dumps to a
+ * DynamicChannelBuffer.
+ */
+public class DynamicChannelBufferOutputStream implements DataOutput {
+  /** Internal dynamic channel buffer */
+  private DynamicChannelBuffer buffer;
+
+  /**
+   * Constructor
+   *
+   * @param estimatedLength Estimated length of the buffer
+   */
+  public DynamicChannelBufferOutputStream(int estimatedLength) {
+    buffer = (DynamicChannelBuffer)
+        ChannelBuffers.dynamicBuffer(ByteOrder.LITTLE_ENDIAN,
+            estimatedLength, DirectChannelBufferFactory.getInstance());
+  }
+
+  /**
+   * Constructor with the buffer to use
+   *
+   * @param buffer Buffer to be written to (cleared before use)
+   */
+  public DynamicChannelBufferOutputStream(DynamicChannelBuffer buffer) {
+    this.buffer = buffer;
+    buffer.clear();
+  }
+
+  /**
+   * Get the dynamic channel buffer
+   *
+   * @return dynamic channel buffer (not a copy)
+   */
+  public DynamicChannelBuffer getDynamicChannelBuffer() {
+    return buffer;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    buffer.writeByte(b);
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    buffer.writeBytes(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    buffer.writeBytes(b, off, len);
+  }
+
+  @Override
+  public void writeBoolean(boolean v) throws IOException {
+    buffer.writeByte(v ? 1 : 0);
+  }
+
+  @Override
+  public void writeByte(int v) throws IOException {
+    buffer.writeByte(v);
+  }
+
+  @Override
+  public void writeShort(int v) throws IOException {
+    buffer.writeShort(v);
+  }
+
+  @Override
+  public void writeChar(int v) throws IOException {
+    buffer.writeChar(v);
+  }
+
+  @Override
+  public void writeInt(int v) throws IOException {
+    buffer.writeInt(v);
+  }
+
+  @Override
+  public void writeLong(long v) throws IOException {
+    buffer.writeLong(v);
+  }
+
+  @Override
+  public void writeFloat(float v) throws IOException {
+    buffer.writeFloat(v);
+  }
+
+  @Override
+  public void writeDouble(double v) throws IOException {
+    buffer.writeDouble(v);
+  }
+
+  @Override
+  public void writeBytes(String s) throws IOException {
+    // Note that this code is mostly copied from DataOutputStream
+    int len = s.length();
+    for (int i = 0; i < len; i++) {
+      buffer.writeByte((byte) s.charAt(i));
+    }
+  }
+
+  @Override
+  public void writeChars(String s) throws IOException {
+    // Note that this code is mostly copied from DataOutputStream
+    int len = s.length();
+    for (int i = 0; i < len; i++) {
+      int v = s.charAt(i);
+      buffer.writeByte((v >>> 8) & 0xFF);
+      buffer.writeByte((v >>> 0) & 0xFF);
+    }
+  }
+
+  @Override
+  public void writeUTF(String s) throws IOException {
+    // Note that this code is mostly copied from DataOutputStream
+    int strlen = s.length();
+    int utflen = 0;
+    int c;
+
+    /* use charAt instead of copying String to char array */
+    for (int i = 0; i < strlen; i++) {
+      c = s.charAt(i);
+      if ((c >= 0x0001) && (c <= 0x007F)) {
+        utflen++;
+      } else if (c > 0x07FF) {
+        utflen += 3;
+      } else {
+        utflen += 2;
+      }
+    }
+
+    buffer.writeByte((byte) ((utflen >>> 8) & 0xFF));
+    buffer.writeByte((byte) ((utflen >>> 0) & 0xFF));
+
+    int i = 0;
+    for (i = 0; i < strlen; i++) {
+      c = s.charAt(i);
+      if (!((c >= 0x0001) && (c <= 0x007F))) {
+        break;
+      }
+      buffer.writeByte((byte) c);
+    }
+
+    for (; i < strlen; i++) {
+      c = s.charAt(i);
+      if ((c >= 0x0001) && (c <= 0x007F)) {
+        buffer.writeByte((byte) c);
+
+      } else if (c > 0x07FF) {
+        buffer.writeByte((byte) (0xE0 | ((c >> 12) & 0x0F)));
+        buffer.writeByte((byte) (0x80 | ((c >>  6) & 0x3F)));
+        buffer.writeByte((byte) (0x80 | ((c >>  0) & 0x3F)));
+      } else {
+        buffer.writeByte((byte) (0xC0 | ((c >>  6) & 0x1F)));
+        buffer.writeByte((byte) (0x80 | ((c >>  0) & 0x3F)));
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/EdgeIterables.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/EdgeIterables.java b/giraph-core/src/main/java/org/apache/giraph/utils/EdgeIterables.java
new file mode 100644
index 0000000..cf1ea99
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/EdgeIterables.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Utilities for converting between edge iterables and neighbor iterables.
+ */
+public class EdgeIterables {
+  /** Utility classes shouldn't be instantiated. */
+  private EdgeIterables() { }
+
+  /**
+   * Convert an edge iterable into a neighbor iterable.
+   *
+   * @param edges Edge iterable.
+   * @param <I> Vertex id type.
+   * @param <E> Edge value type.
+   * @return Neighbor iterable.
+   */
+  public static
+  <I extends WritableComparable, E extends Writable>
+  Iterable<I> getNeighbors(Iterable<Edge<I, E>> edges) {
+    return Iterables.transform(edges,
+        new Function<Edge<I, E>, I>() {
+          @Override
+          public I apply(Edge<I, E> edge) {
+            return edge == null ? null : edge.getTargetVertexId();
+          }
+        });
+  }
+
+  /**
+   * Convert a neighbor iterable into an edge iterable.
+   *
+   * @param neighbors Neighbor iterable.
+   * @param <I> Vertex id type.
+   * @return Edge iterable.
+   */
+  public static
+  <I extends WritableComparable>
+  Iterable<Edge<I, NullWritable>> getEdges(Iterable<I> neighbors) {
+    return Iterables.transform(neighbors,
+        new Function<I, Edge<I, NullWritable>>() {
+          @Override
+          public Edge<I, NullWritable> apply(I neighbor) {
+            return new Edge<I, NullWritable>(neighbor, NullWritable.get());
+          }
+        });
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/EmptyIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/EmptyIterable.java b/giraph-core/src/main/java/org/apache/giraph/utils/EmptyIterable.java
new file mode 100644
index 0000000..382c39c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/EmptyIterable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Helper empty iterable when there are no messages.
+ *
+ * @param <M> Message data
+ */
+public class EmptyIterable<M> implements Iterable<M>, Iterator<M> {
+  /** Singleton empty iterable */
+  private static final EmptyIterable<Object> EMPTY_ITERABLE =
+      new EmptyIterable<Object>();
+
+  /**
+   * Get the singleton empty iterable
+   *
+   * @param <T> Type of the empty iterable
+   * @return Empty singleton iterable
+   */
+  public static <T> Iterable<T> emptyIterable() {
+    return (Iterable<T>) EMPTY_ITERABLE;
+  }
+
+  @Override
+  public Iterator<M> iterator() {
+    return this;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public M next() {
+    throw new NoSuchElementException();
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java
new file mode 100644
index 0000000..ccd137c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+/**
+ * User must follow this protocol for concurrent access:
+ *
+ * (1) an object instance is constructed
+ * (2) arbitrarily many times
+ *     (2a) concurrent calls to requirePermits(), releasePermits() and
+ *          waitForRequiredPermits() are issued
+ *     (2b) waitForRequiredPermits() returns
+ *
+ * Note that the next cycle of  calls to requirePermits() or releasePermits()
+ * cannot start until the previous call to waitForRequiredPermits()
+ * has returned.
+ *
+ * Methods of this class are thread-safe.
+ */
+public class ExpectedBarrier {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(ExpectedBarrier.class);
+  /** Msecs to refresh the progress meter */
+  private static final int MSEC_PERIOD = 10000;
+  /** Progressable for reporting progress */
+  private final Progressable progressable;
+  /** Number of times permits were added */
+  private long timesRequired = 0;
+  /** Number of permits we are currently waiting for */
+  private long waitingOnPermits = 0;
+  /** Logger */
+  private final TimedLogger logger;
+
+  /**
+   * Constructor
+   *
+   * @param progressable Progressable for reporting progress
+   */
+  public ExpectedBarrier(Progressable progressable) {
+    this.progressable = progressable;
+    logger = new TimedLogger(MSEC_PERIOD, LOG);
+  }
+
+  /**
+   * Wait until permits have been required desired number of times,
+   * and all required permits are available
+   *
+   * @param desiredTimesRequired How many times should permits have been
+   *                             required
+   */
+  public synchronized void waitForRequiredPermits(
+      long desiredTimesRequired) {
+    while (timesRequired < desiredTimesRequired || waitingOnPermits > 0) {
+      try {
+        wait(MSEC_PERIOD);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("waitForRequiredPermits: " +
+            "InterruptedException occurred");
+      }
+      progressable.progress();
+      if (LOG.isInfoEnabled()) {
+        if (timesRequired < desiredTimesRequired) {
+          logger.info("waitForRequiredPermits: " +
+              "Waiting for times required to be " + desiredTimesRequired +
+              " (currently " + timesRequired + ") ");
+        } else {
+          logger.info("waitForRequiredPermits: " +
+              "Waiting for " + waitingOnPermits + " more permits.");
+        }
+      }
+    }
+
+    // Reset for the next time to use
+    timesRequired = 0;
+    waitingOnPermits = 0;
+  }
+
+  /**
+   * Require more permits. This will increase the number of times permits
+   * were required. Doesn't wait for permits to become available.
+   *
+   * @param permits Number of permits to require
+   */
+  public synchronized void requirePermits(long permits) {
+    timesRequired++;
+    waitingOnPermits += permits;
+    notifyAll();
+  }
+
+  /**
+   * Release one permit.
+   */
+  public synchronized void releaseOnePermit() {
+    releasePermits(1);
+  }
+
+  /**
+   * Release some permits.
+   *
+   * @param permits Number of permits to release
+   */
+  public synchronized void releasePermits(long permits) {
+    waitingOnPermits -= permits;
+    notifyAll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
new file mode 100644
index 0000000..0ecea77
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataInput.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+/**
+ * Provides access to a internals of ByteArrayInputStream
+ */
+public class ExtendedByteArrayDataInput extends ByteArrayInputStream
+    implements ExtendedDataInput {
+  /** Internal data input */
+  private final DataInput dataInput;
+  /**
+   * Constructor
+   *
+   * @param buf Buffer to read
+   */
+  public ExtendedByteArrayDataInput(byte[] buf) {
+    super(buf);
+    dataInput = new DataInputStream(this);
+  }
+
+  /**
+   * Get access to portion of a byte array
+   *
+   * @param buf Byte array to access
+   * @param offset Offset into the byte array
+   * @param length Length to read
+   */
+  public ExtendedByteArrayDataInput(byte[] buf, int offset, int length) {
+    super(buf, offset, length);
+    dataInput = new DataInputStream(this);
+  }
+
+  @Override
+  public int getPos() {
+    return pos;
+  }
+
+  @Override
+  public void readFully(byte[] b) throws IOException {
+    dataInput.readFully(b);
+  }
+
+  @Override
+  public void readFully(byte[] b, int off, int len) throws IOException {
+    dataInput.readFully(b, off, len);
+  }
+
+  @Override
+  public int skipBytes(int n) throws IOException {
+    return dataInput.skipBytes(n);
+  }
+
+  @Override
+  public boolean readBoolean() throws IOException {
+    return dataInput.readBoolean();
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    return dataInput.readByte();
+  }
+
+  @Override
+  public int readUnsignedByte() throws IOException {
+    return dataInput.readUnsignedByte();
+  }
+
+  @Override
+  public short readShort() throws IOException {
+    return dataInput.readShort();
+  }
+
+  @Override
+  public int readUnsignedShort() throws IOException {
+    return dataInput.readUnsignedShort();
+  }
+
+  @Override
+  public char readChar() throws IOException {
+    return dataInput.readChar();
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    return dataInput.readInt();
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    return dataInput.readLong();
+  }
+
+  @Override
+  public float readFloat() throws IOException {
+    return dataInput.readFloat();
+  }
+
+  @Override
+  public double readDouble() throws IOException {
+    return dataInput.readDouble();
+  }
+
+  @Override
+  public String readLine() throws IOException {
+    return dataInput.readLine();
+  }
+
+  @Override
+  public String readUTF() throws IOException {
+    return dataInput.readUTF();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
new file mode 100644
index 0000000..247130b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayDataOutput.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Adds some functionality to ByteArrayOutputStream,
+ * such as an option to write int value over previously written data
+ * and directly get the byte array.
+ */
+public class ExtendedByteArrayDataOutput extends ByteArrayOutputStream
+    implements ExtendedDataOutput {
+  /** Default number of bytes */
+  private static final int DEFAULT_BYTES = 32;
+  /** Internal data output */
+  private final DataOutput dataOutput;
+
+  /**
+   * Uses the byte array provided or if null, use a default size
+   *
+   * @param buf Buffer to use
+   */
+  public ExtendedByteArrayDataOutput(byte[] buf) {
+    if (buf == null) {
+      this.buf = new byte[DEFAULT_BYTES];
+    } else {
+      this.buf = buf;
+    }
+    dataOutput = new DataOutputStream(this);
+  }
+
+  /**
+   * Uses the byte array provided at the given pos
+   *
+   * @param buf Buffer to use
+   * @param pos Position in the buffer to start writing from
+   */
+  public ExtendedByteArrayDataOutput(byte[] buf, int pos) {
+    this.buf = buf;
+    this.count = pos;
+    dataOutput = new DataOutputStream(this);
+  }
+
+  /**
+   * Creates a new byte array output stream. The buffer capacity is
+   * initially 32 bytes, though its size increases if necessary.
+   */
+  public ExtendedByteArrayDataOutput() {
+    this(DEFAULT_BYTES);
+  }
+
+  /**
+   * Creates a new byte array output stream, with a buffer capacity of
+   * the specified size, in bytes.
+   *
+   * @param size the initial size.
+   * @exception  IllegalArgumentException if size is negative.
+   */
+  public ExtendedByteArrayDataOutput(int size) {
+    if (size < 0) {
+      throw new IllegalArgumentException("Negative initial size: " +
+          size);
+    }
+    buf = new byte[size];
+    dataOutput = new DataOutputStream(this);
+  }
+
+  @Override
+  public void writeBoolean(boolean v) throws IOException {
+    dataOutput.writeBoolean(v);
+  }
+
+  @Override
+  public void writeByte(int v) throws IOException {
+    dataOutput.writeByte(v);
+  }
+
+  @Override
+  public void writeShort(int v) throws IOException {
+    dataOutput.writeShort(v);
+  }
+
+  @Override
+  public void writeChar(int v) throws IOException {
+    dataOutput.writeChar(v);
+  }
+
+  @Override
+  public void writeInt(int v) throws IOException {
+    dataOutput.writeInt(v);
+  }
+
+  @Override
+  public void writeLong(long v) throws IOException {
+    dataOutput.writeLong(v);
+  }
+
+  @Override
+  public void writeFloat(float v) throws IOException {
+    dataOutput.writeFloat(v);
+  }
+
+  @Override
+  public void writeDouble(double v) throws IOException {
+    dataOutput.writeDouble(v);
+  }
+
+  @Override
+  public void writeBytes(String s) throws IOException {
+    dataOutput.writeBytes(s);
+  }
+
+  @Override
+  public void writeChars(String s) throws IOException {
+    dataOutput.writeChars(s);
+  }
+
+  @Override
+  public void writeUTF(String s) throws IOException {
+    dataOutput.writeUTF(s);
+  }
+
+  @Override
+  public void skipBytes(int bytesToSkip) {
+    if ((count + bytesToSkip) > buf.length) {
+      buf = Arrays.copyOf(buf, Math.max(buf.length << 1, count + bytesToSkip));
+    }
+    count += bytesToSkip;
+  }
+
+  @Override
+  public void writeInt(int position, int value) {
+    if (position + 4 > count) {
+      throw new IndexOutOfBoundsException(
+          "writeIntOnPosition: Tried to write int to position " + position +
+              " but current length is " + count);
+    }
+    buf[position] = (byte) ((value >>> 24) & 0xFF);
+    buf[position + 1] = (byte) ((value >>> 16) & 0xFF);
+    buf[position + 2] = (byte) ((value >>> 8) & 0xFF);
+    buf[position + 3] = (byte) ((value >>> 0) & 0xFF);
+  }
+
+  @Override
+  public byte[] getByteArray() {
+    return buf;
+  }
+
+  @Override
+  public int getPos() {
+    return count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java
new file mode 100644
index 0000000..f1c6809
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataInput.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataInput;
+
+/**
+ * Add some functionality to data input
+ */
+public interface ExtendedDataInput extends DataInput {
+  /**
+   * Get the position of what has been read
+   *
+   * @return How many bytes have been read?
+   */
+  int getPos();
+
+  /**
+   * How many bytes are available?
+   *
+   * @return Bytes available
+   */
+  int available();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
new file mode 100644
index 0000000..54ef514
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.DataOutput;
+
+/**
+ * Add a few features to data output
+ */
+public interface ExtendedDataOutput extends DataOutput {
+  /**
+   * Skip some number of bytes.
+   *
+   * @param  bytesToSkip Number of bytes to skip
+   */
+  void skipBytes(int bytesToSkip);
+
+  /**
+   * In order to write a size as a first part of an data output, it is
+   * useful to be able to write an int at an arbitrary location in the stream
+   *
+   * @param pos Byte position in the output stream
+   * @param value Value to write
+   */
+  void writeInt(int pos, int value);
+
+  /**
+   * Get the position in the output stream
+   *
+   * @return Position in the output stream
+   */
+  int getPos();
+
+  /**
+   * Get the internal byte array (if possible), read-only
+   *
+   * @return Internal byte array (do not modify)
+   */
+  byte[] getByteArray();
+
+  /**
+   * Copies the internal byte array
+   *
+   * @return Copied byte array
+   */
+  byte[] toByteArray();
+
+  /**
+   * Clears the buffer
+   */
+  void reset();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/FakeTime.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/FakeTime.java b/giraph-core/src/main/java/org/apache/giraph/utils/FakeTime.java
new file mode 100644
index 0000000..f20c10b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/FakeTime.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Thread-safe implementation of Time for testing that can help get time based
+ * ordering of events when desired.
+ */
+public class FakeTime implements Time {
+  /** Nanoseconds from the fake epoch */
+  private final AtomicLong nanosecondsSinceEpoch = new AtomicLong();
+
+  @Override
+  public long getMilliseconds() {
+    return nanosecondsSinceEpoch.get() / NS_PER_MS;
+  }
+
+  @Override
+  public long getMicroseconds() {
+    return nanosecondsSinceEpoch.get() / NS_PER_US;
+  }
+
+  @Override
+  public long getNanoseconds() {
+    return nanosecondsSinceEpoch.get();
+  }
+
+  @Override
+  public int getSeconds() {
+    return (int) (nanosecondsSinceEpoch.get() / NS_PER_SECOND);
+  }
+
+  @Override
+  public Date getCurrentDate() {
+    return new Date(getMilliseconds());
+  }
+
+  @Override
+  public void sleep(long milliseconds) throws InterruptedException {
+    nanosecondsSinceEpoch.getAndAdd(milliseconds * NS_PER_MS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
new file mode 100644
index 0000000..442fc9f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.Writer;
+
+/**
+ * Helper class for filesystem operations during testing
+ */
+public class FileUtils {
+
+  /**
+   * Utility class should not be instantiatable
+   */
+  private FileUtils() {
+  }
+
+  /**
+   * Create a temporary folder that will be removed after the test.
+   *
+   * @param vertexClass Used for generating the folder name.
+   * @return File object for the directory.
+   */
+  public static File createTestDir(Class<?> vertexClass)
+    throws IOException {
+    String systemTmpDir = System.getProperty("java.io.tmpdir");
+    long simpleRandomLong = (long) (Long.MAX_VALUE * Math.random());
+    File testTempDir = new File(systemTmpDir, "giraph-" +
+        vertexClass.getSimpleName() + '-' + simpleRandomLong);
+    if (!testTempDir.mkdir()) {
+      throw new IOException("Could not create " + testTempDir);
+    }
+    testTempDir.deleteOnExit();
+    return testTempDir;
+  }
+
+  /**
+   * Make a temporary file.
+   *
+   * @param parent Parent directory.
+   * @param name File name.
+   * @return File object to temporary file.
+   * @throws IOException
+   */
+  public static File createTempFile(File parent, String name)
+    throws IOException {
+    return createTestTempFileOrDir(parent, name, false);
+  }
+
+  /**
+   * Make a temporary directory.
+   *
+   * @param parent Parent directory.
+   * @param name Directory name.
+   * @return File object to temporary file.
+   * @throws IOException
+   */
+  public static File createTempDir(File parent, String name)
+    throws IOException {
+    File dir = createTestTempFileOrDir(parent, name, true);
+    dir.delete();
+    return dir;
+  }
+
+  /**
+   * Create a test temp file or directory.
+   *
+   * @param parent Parent directory
+   * @param name Name of file
+   * @param dir Is directory?
+   * @return File object
+   * @throws IOException
+   */
+  public static File createTestTempFileOrDir(File parent, String name,
+    boolean dir) throws IOException {
+    File f = new File(parent, name);
+    f.deleteOnExit();
+    if (dir && !f.mkdirs()) {
+      throw new IOException("Could not make directory " + f);
+    }
+    return f;
+  }
+
+  /**
+   * Write lines to a file.
+   *
+   * @param file File to write lines to
+   * @param lines Strings written to the file
+   * @throws IOException
+   */
+  public static void writeLines(File file, String[] lines)
+    throws IOException {
+    Writer writer = Files.newWriter(file, Charsets.UTF_8);
+    try {
+      for (String line : lines) {
+        writer.write(line);
+        writer.write('\n');
+      }
+    } finally {
+      Closeables.closeQuietly(writer);
+    }
+  }
+
+  /**
+   * Recursively delete a directory
+   *
+   * @param dir Directory to delete
+   */
+  public static void delete(File dir) {
+    if (dir != null) {
+      new DeletingVisitor().accept(dir);
+    }
+  }
+
+  /**
+   * Deletes files.
+   */
+  private static class DeletingVisitor implements FileFilter {
+    @Override
+    public boolean accept(File f) {
+      if (!f.isFile()) {
+        f.listFiles(this);
+      }
+      f.delete();
+      return false;
+    }
+  }
+
+  /**
+   * Helper method to remove a path if it exists.
+   *
+   * @param conf Configuration to load FileSystem from
+   * @param path Path to remove
+   * @throws IOException
+   */
+  public static void deletePath(Configuration conf, String path)
+    throws IOException {
+    deletePath(conf, new Path(path));
+  }
+
+  /**
+   * Helper method to remove a path if it exists.
+   *
+   * @param conf Configuration to load FileSystem from
+   * @param path Path to remove
+   * @throws IOException
+   */
+  public static void deletePath(Configuration conf, Path path)
+    throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(path, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/IncreasingBitSet.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/IncreasingBitSet.java b/giraph-core/src/main/java/org/apache/giraph/utils/IncreasingBitSet.java
new file mode 100644
index 0000000..67d69a0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/IncreasingBitSet.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.BitSet;
+
+/**
+ * Bit set optimized for increasing longs to save storage space.
+ * The general idea is that even though some keys will be added out-of-order,
+ * there is a base key that keeps increasing so that the bit set doesn't get
+ * very big.  When there are enough set bits, the bit set gets compacted.
+ * Thread-safe.
+ */
+public class IncreasingBitSet {
+  /** Minimum number of bits to shift */
+  public static final int MIN_BITS_TO_SHIFT = 64 * 1024;
+  /** Bit set used */
+  private BitSet bitSet = new BitSet();
+  /** Last base key (all keys < this have been accepted */
+  private long lastBaseKey = 0;
+
+  /**
+   * Add a key if it is possible.
+   *
+   * @param key Key to add
+   * @return True if the key was added, false otherwise
+   */
+  public synchronized boolean add(long key) {
+    long remainder = key - lastBaseKey;
+    checkLegalKey(remainder);
+
+    if (remainder < 0) {
+      return false;
+    }
+    if (bitSet.get((int) remainder)) {
+      return false;
+    }
+    bitSet.set((int) remainder);
+    int nextClearBit = bitSet.nextClearBit(0);
+    if (nextClearBit >= MIN_BITS_TO_SHIFT) {
+      bitSet = bitSet.get(nextClearBit,
+          Math.max(nextClearBit, bitSet.length()));
+      lastBaseKey += nextClearBit;
+    }
+    return true;
+  }
+
+  /**
+   * Get the number of set bits
+   *
+   * @return Number of set bits
+   */
+  public synchronized long cardinality() {
+    long size = bitSet.cardinality();
+    return size + lastBaseKey;
+  }
+
+  /**
+   * Get the size of the bit set
+   *
+   * @return Size of the bit set
+   */
+  public synchronized int size() {
+    return bitSet.size();
+  }
+
+  /**
+   * Check for existence of a key
+   *
+   * @param key Key to check for
+   * @return True if the key exists, false otherwise
+   */
+  public synchronized boolean has(long key) {
+    long remainder = key - lastBaseKey;
+    checkLegalKey(remainder);
+
+    if (remainder < 0) {
+      return true;
+    }
+    return bitSet.get((int) remainder);
+  }
+
+  /**
+   * Get the last base key (mainly for debugging).
+   *
+   * @return Last base key
+   */
+  public synchronized long getLastBaseKey() {
+    return lastBaseKey;
+  }
+
+  /**
+   * Check the remainder for validity
+   *
+   * @param remainder Remainder to check
+   */
+  private void checkLegalKey(long remainder) {
+    if (remainder > Integer.MAX_VALUE) {
+      throw new IllegalArgumentException(
+          "checkLegalKey: Impossible that to add key " +
+          (remainder + lastBaseKey) + " with base " +
+          lastBaseKey + " since the " +
+          "spread is too large (> " + Integer.MAX_VALUE);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/IntPair.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/IntPair.java b/giraph-core/src/main/java/org/apache/giraph/utils/IntPair.java
new file mode 100644
index 0000000..f73f44e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/IntPair.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+/**
+ * A pair of integers.
+ */
+public class IntPair {
+  /** First element. */
+  private int first;
+  /** Second element. */
+  private int second;
+
+  /** Constructor.
+   *
+   * @param fst First element
+   * @param snd Second element
+   */
+  public IntPair(int fst, int snd) {
+    first = fst;
+    second = snd;
+  }
+
+  /**
+   * Get the first element.
+   *
+   * @return The first element
+   */
+  public int getFirst() {
+    return first;
+  }
+
+  /**
+   * Set the first element.
+   *
+   * @param first The first element
+   */
+  public void setFirst(int first) {
+    this.first = first;
+  }
+
+  /**
+   * Get the second element.
+   *
+   * @return The second element
+   */
+  public int getSecond() {
+    return second;
+  }
+
+  /**
+   * Set the second element.
+   *
+   * @param second The second element
+   */
+  public void setSecond(int second) {
+    this.second = second;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
new file mode 100644
index 0000000..bccb827
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.io.GiraphFileInputFormat;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A base class for running internal tests on a vertex
+ *
+ * Extending classes only have to invoke the run() method to test their vertex.
+ * All data is written to a local tmp directory that is removed afterwards.
+ * A local zookeeper instance is started in an extra thread and
+ * shutdown at the end.
+ *
+ * Heavily inspired from Apache Mahout's MahoutTestCase
+ */
+public class InternalVertexRunner {
+  /** ZooKeeper port to use for tests */
+  public static final int LOCAL_ZOOKEEPER_PORT = 22182;
+
+  /** Don't construct */
+  private InternalVertexRunner() { }
+
+  /**
+   * Attempts to run the vertex internally in the current JVM, reading from and
+   * writing to a temporary folder on local disk. Will start its own zookeeper
+   * instance.
+   *
+   * @param classes GiraphClasses specifying which types to use
+   * @param params a map of parameters to add to the hadoop configuration
+   * @param vertexInputData linewise vertex input data
+   * @return linewise output data
+   * @throws Exception if anything goes wrong
+   */
+  public static Iterable<String> run(
+      GiraphClasses classes,
+      Map<String, String> params,
+      String[] vertexInputData) throws Exception {
+    return run(classes, params, vertexInputData, null);
+  }
+
+  /**
+   * Attempts to run the vertex internally in the current JVM, reading from and
+   * writing to a temporary folder on local disk. Will start its own zookeeper
+   * instance.
+   *
+   * @param classes GiraphClasses specifying which types to use
+   * @param params a map of parameters to add to the hadoop configuration
+   * @param vertexInputData linewise vertex input data
+   * @param edgeInputData linewise edge input data
+   * @return linewise output data
+   * @throws Exception if anything goes wrong
+   */
+  public static Iterable<String> run(
+      GiraphClasses classes,
+      Map<String, String> params,
+      String[] vertexInputData,
+      String[] edgeInputData) throws Exception {
+    File tmpDir = null;
+    try {
+      // Prepare input file, output folder and temporary folders
+      tmpDir = FileUtils.createTestDir(classes.getVertexClass());
+
+      File vertexInputFile = null;
+      File edgeInputFile = null;
+      if (classes.hasVertexInputFormat()) {
+        vertexInputFile = FileUtils.createTempFile(tmpDir, "vertices.txt");
+      }
+      if (classes.hasEdgeInputFormat()) {
+        edgeInputFile = FileUtils.createTempFile(tmpDir, "edges.txt");
+      }
+
+      File outputDir = FileUtils.createTempDir(tmpDir, "output");
+      File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
+      File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
+      File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints");
+
+      // Write input data to disk
+      if (classes.hasVertexInputFormat()) {
+        FileUtils.writeLines(vertexInputFile, vertexInputData);
+      }
+      if (classes.hasEdgeInputFormat()) {
+        FileUtils.writeLines(edgeInputFile, edgeInputData);
+      }
+
+      // Create and configure the job to run the vertex
+      GiraphJob job = new GiraphJob(classes.getVertexClass().getName());
+      GiraphConfiguration conf = job.getConfiguration();
+      conf.setVertexClass(classes.getVertexClass());
+      if (classes.hasVertexInputFormat()) {
+        conf.setVertexInputFormatClass(classes.getVertexInputFormatClass());
+      }
+      if (classes.hasEdgeInputFormat()) {
+        conf.setEdgeInputFormatClass(classes.getEdgeInputFormatClass());
+      }
+      if (classes.hasVertexOutputFormat()) {
+        conf.setVertexOutputFormatClass(classes.getVertexOutputFormatClass());
+      }
+      if (classes.hasWorkerContextClass()) {
+        conf.setWorkerContextClass(classes.getWorkerContextClass());
+      }
+      if (classes.hasCombinerClass()) {
+        conf.setVertexCombinerClass(classes.getCombinerClass());
+      }
+      if (classes.hasMasterComputeClass()) {
+        conf.setMasterComputeClass(classes.getMasterComputeClass());
+      }
+
+      conf.setWorkerConfiguration(1, 1, 100.0f);
+      conf.setBoolean(GiraphConstants.SPLIT_MASTER_WORKER, false);
+      conf.setBoolean(GiraphConstants.LOCAL_TEST_MODE, true);
+      conf.set(GiraphConstants.ZOOKEEPER_LIST, "localhost:" +
+          String.valueOf(LOCAL_ZOOKEEPER_PORT));
+
+      conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
+      conf.set(GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY,
+          zkMgrDir.toString());
+      conf.set(GiraphConstants.CHECKPOINT_DIRECTORY, checkpointsDir.toString());
+
+      for (Map.Entry<String, String> param : params.entrySet()) {
+        conf.set(param.getKey(), param.getValue());
+      }
+
+      Job internalJob = job.getInternalJob();
+      if (classes.hasVertexInputFormat()) {
+        GiraphFileInputFormat.addVertexInputPath(internalJob,
+            new Path(vertexInputFile.toString()));
+      }
+      if (classes.hasEdgeInputFormat()) {
+        GiraphFileInputFormat.addEdgeInputPath(internalJob,
+            new Path(edgeInputFile.toString()));
+      }
+      FileOutputFormat.setOutputPath(job.getInternalJob(),
+                                     new Path(outputDir.toString()));
+
+      // Configure a local zookeeper instance
+      Properties zkProperties = configLocalZooKeeper(zkDir);
+
+      QuorumPeerConfig qpConfig = new QuorumPeerConfig();
+      qpConfig.parseProperties(zkProperties);
+
+      // Create and run the zookeeper instance
+      final InternalZooKeeper zookeeper = new InternalZooKeeper();
+      final ServerConfig zkConfig = new ServerConfig();
+      zkConfig.readFrom(qpConfig);
+
+      ExecutorService executorService = Executors.newSingleThreadExecutor();
+      executorService.execute(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            zookeeper.runFromConfig(zkConfig);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+      try {
+        job.run(true);
+      } finally {
+        executorService.shutdown();
+        zookeeper.end();
+      }
+
+      if (classes.hasVertexOutputFormat()) {
+        return Files.readLines(new File(outputDir, "part-m-00000"),
+            Charsets.UTF_8);
+      } else {
+        return ImmutableList.of();
+      }
+    } finally {
+      FileUtils.delete(tmpDir);
+    }
+  }
+
+  /**
+   * Configuration options for running local ZK.
+   *
+   * @param zkDir directory for ZK to hold files in.
+   * @return Properties configured for local ZK.
+   */
+  private static Properties configLocalZooKeeper(File zkDir) {
+    Properties zkProperties = new Properties();
+    zkProperties.setProperty("tickTime", "2000");
+    zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
+    zkProperties.setProperty("clientPort",
+        String.valueOf(LOCAL_ZOOKEEPER_PORT));
+    zkProperties.setProperty("maxClientCnxns", "10000");
+    zkProperties.setProperty("minSessionTimeout", "10000");
+    zkProperties.setProperty("maxSessionTimeout", "100000");
+    zkProperties.setProperty("initLimit", "10");
+    zkProperties.setProperty("syncLimit", "5");
+    zkProperties.setProperty("snapCount", "50000");
+    return zkProperties;
+  }
+
+  /**
+   * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
+   */
+  private static class InternalZooKeeper extends ZooKeeperServerMain {
+    /**
+     * Shutdown the ZooKeeper instance.
+     */
+    void end() {
+      shutdown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/LoggerUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/LoggerUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/LoggerUtils.java
new file mode 100644
index 0000000..81dfd1d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/LoggerUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * Logger utils for log4j
+ */
+public class LoggerUtils {
+  /**
+   * Don't construct this.
+   */
+  private LoggerUtils() { }
+
+  /**
+   * Helper method to set the status and log message together.
+   *
+   * @param context Context to set the status with
+   * @param logger Logger to write to
+   * @param level Level of logging
+   * @param message Message to set status with
+   */
+  public static void setStatusAndLog(
+      TaskAttemptContext context, Logger logger, Level level,
+      String message) {
+    try {
+      setStatus(context, message);
+    } catch (IOException e) {
+      throw new IllegalStateException("setStatusAndLog: Got IOException", e);
+    }
+    if (logger.isEnabledFor(level)) {
+      logger.log(level, message);
+    }
+  }
+
+  /**
+   * Set Hadoop status message.
+   *
+   * NOTE: In theory this function could get folded in to the callsites, but
+   * the issue is that some Hadoop jars, e.g. 0.23 and 2.0.0, don't actually
+   * throw IOException on setStatus while others do. This makes wrapping it in a
+   * try/catch cause a compile error on those Hadoops. With this function every
+   * caller sees a method that throws IOException. In case it doesn't actually,
+   * there is no more compiler error because not throwing a decalred exception
+   * is at best a warning.
+   *
+   * @param context Context to set the status with
+   * @param message Message to set status with
+   * @throws IOException If something goes wrong with setting status message
+   */
+  private static void setStatus(TaskAttemptContext context, String message)
+    throws IOException {
+    context.setStatus(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/MathUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/MathUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/MathUtils.java
new file mode 100644
index 0000000..db7ea5b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/MathUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+import java.util.Iterator;
+
+/**
+ * A helper class for math related operations with writables
+ */
+public class MathUtils {
+
+  /**
+   * Utility classes cannot be instantiated
+   */
+  private MathUtils() {
+  }
+
+  /**
+   * Sums up a sequence of double values
+   * @param values double values
+   * @return sum of double values
+   */
+  public static double sum(Iterable<DoubleWritable> values) {
+    return sum(values.iterator());
+  }
+
+  /**
+   * Sums up a sequence of double values
+   * @param values double values
+   * @return sum of double values
+   */
+  public static double sum(Iterator<DoubleWritable> values) {
+    double sum = 0;
+    while (values.hasNext()) {
+      sum += values.next().get();
+    }
+    return sum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java
new file mode 100644
index 0000000..eec8388
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/MemoryUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import com.yammer.metrics.util.PercentGauge;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphMetricsRegistry;
+
+/**
+ * Helper static methods for tracking memory usage.
+ */
+public class MemoryUtils {
+  /** Do not instantiate. */
+  private MemoryUtils() { }
+
+  /**
+   * Helper to compute megabytes
+   * @param bytes integer number of bytes
+   * @return megabytes
+   */
+  private static double megaBytes(long bytes) {
+    return bytes / 1024.0 / 1024.0;
+  }
+
+  /**
+   * Get total memory in megabytes
+   * @return total memory in megabytes
+   */
+  public static double totalMemoryMB() {
+    return megaBytes(Runtime.getRuntime().totalMemory());
+  }
+
+  /**
+   * Get maximum memory in megabytes
+   * @return maximum memory in megabytes
+   */
+  public static double maxMemoryMB() {
+    return megaBytes(Runtime.getRuntime().maxMemory());
+  }
+
+  /**
+   * Get free memory in megabytes
+   * @return free memory in megabytes
+   */
+  public static double freeMemoryMB() {
+    return megaBytes(Runtime.getRuntime().freeMemory());
+  }
+
+  /**
+   * Initialize metrics tracked by this helper.
+   */
+  public static void initMetrics() {
+    GiraphMetricsRegistry metrics = GiraphMetrics.get().perJob();
+    metrics.getGauge("memory-free-pct", new PercentGauge() {
+        @Override
+        protected double getNumerator() {
+          return freeMemoryMB();
+        }
+
+        @Override
+        protected double getDenominator() {
+          return totalMemoryMB();
+        }
+      }
+    );
+  }
+
+  /**
+   * Get stringified runtime memory stats
+   *
+   * @return String of all Runtime stats.
+   */
+  public static String getRuntimeMemoryStats() {
+    return "totalMem = " + totalMemoryMB() +
+      "M, maxMem = "  + maxMemoryMB() +
+      "M, freeMem = " + freeMemoryMB() + "M";
+  }
+}


Mime
View raw message