giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ikabi...@apache.org
Subject [1/5] git commit: updated refs/heads/trunk to 819d6d3
Date Thu, 11 Jun 2015 02:50:40 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk 06a1084af -> 819d6d38d


http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java
b/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java
new file mode 100644
index 0000000..e82c3a8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.edge;
+
+import it.unimi.dsi.fastutil.bytes.ByteArrays;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Iterator;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import org.apache.giraph.utils.EdgeIterables;
+import org.apache.giraph.utils.Trimmable;
+import org.apache.giraph.utils.Varint;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implementation of {@link org.apache.giraph.edge.OutEdges} with long ids
+ * and null edge values, backed by a dynamic primitive array.
+ * Parallel edges are allowed.
+ * Note: this implementation is optimized for space usage,
+ * but random access and edge removals are expensive.
+ * Users of this class should explicitly call {@link #trim()} function
+ * to compact in-memory representation after all updates are done.
+ * Compacting object is expensive so should only be done once after bulk update.
+ * Compaction can also be caused by serialization attempt or
+ * by calling {@link #iterator()}
+ */
+@NotThreadSafe
+public class LongDiffNullArrayEdges
+    extends ConfigurableOutEdges<LongWritable, NullWritable>
+    implements ReuseObjectsOutEdges<LongWritable, NullWritable>,
+    MutableOutEdges<LongWritable, NullWritable>, Trimmable {
+
+  /**
+   * Array of target vertex ids.
+   */
+  private byte[] compressedData;
+  /**
+   * Number of edges stored in compressed array.
+   * There may be some extra edges in transientData or there may be some edges
+   * removed. These will not count here. To get real number of elements stored
+   * in this object @see {@link #size()}
+   */
+  private int size;
+
+  /**
+   * Last updates are stored here. We clear them out after object is compacted.
+   */
+  private TransientChanges transientData;
+
+  @Override
+  public void initialize(Iterable<Edge<LongWritable, NullWritable>> edges) {
+    reset();
+    EdgeIterables.initialize(this, edges);
+    trim();
+  }
+
+  @Override
+  public void initialize(int capacity) {
+    reset();
+    if (capacity > 0) {
+      transientData = new TransientChanges(capacity);
+    }
+  }
+
+  @Override
+  public void initialize() {
+    reset();
+  }
+
+  @Override
+  public void add(Edge<LongWritable, NullWritable> edge) {
+    checkTransientData();
+    transientData.add(edge.getTargetVertexId().get());
+  }
+
+
+  @Override
+  public void remove(LongWritable targetVertexId) {
+    checkTransientData();
+    long target = targetVertexId.get();
+
+    if (size > 0) {
+      LongsDiffReader reader = new LongsDiffReader(compressedData);
+      for (int i = 0; i < size; i++) {
+        long cur = reader.readNext();
+        if (cur == target) {
+          transientData.markRemoved(i);
+        } else if (cur > target) {
+          break;
+        }
+      }
+    }
+    transientData.removeAdded(target);
+  }
+
+  @Override
+  public int size() {
+    int result = size;
+    if (transientData != null) {
+      result += transientData.size();
+    }
+    return result;
+  }
+
+  @Override
+  public Iterator<Edge<LongWritable, NullWritable>> iterator() {
+    // Returns an iterator that reuses objects.
+    // The downcast is fine because all concrete Edge implementations are
+    // mutable, but we only expose the mutation functionality when appropriate.
+    return (Iterator) mutableIterator();
+  }
+
+  @Override
+  public Iterator<MutableEdge<LongWritable, NullWritable>> mutableIterator()
{
+    trim();
+    return new Iterator<MutableEdge<LongWritable, NullWritable>>() {
+      /** Current position in the array. */
+      private int position;
+      private final LongsDiffReader reader =
+          new LongsDiffReader(compressedData);
+
+      /** Representative edge object. */
+      private final MutableEdge<LongWritable, NullWritable> representativeEdge =
+          EdgeFactory.createReusable(new LongWritable());
+
+      @Override
+      public boolean hasNext() {
+        return position < size;
+      }
+
+      @Override
+      public MutableEdge<LongWritable, NullWritable> next() {
+        position++;
+        representativeEdge.getTargetVertexId().set(reader.readNext());
+        return representativeEdge;
+      }
+
+      @Override
+      public void remove() {
+        removeAt(position - 1);
+      }
+    };
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    trim();
+    Varint.writeUnsignedVarInt(compressedData.length, out);
+    Varint.writeUnsignedVarInt(size, out);
+    out.write(compressedData);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    reset();
+    compressedData = new byte[Varint.readUnsignedVarInt(in)];
+    // We can actually calculate size after data array is read,
+    // the trade-off is memory vs speed
+    size = Varint.readUnsignedVarInt(in);
+    in.readFully(compressedData);
+  }
+
+  /**
+   * This function takes all recent updates and stores them efficiently.
+   * It is safe to call this function multiple times.
+   */
+  @Override
+  public void trim() {
+    if (transientData == null) {
+      // We don't have any updates to this object. Return quickly.
+      return;
+    }
+
+    // Beware this array is longer than the number of elements we interested in
+    long[] transientValues = transientData.sortedValues();
+    int pCompressed = 0;
+    int pTransient = 0;
+
+    LongsDiffReader reader = new LongsDiffReader(compressedData);
+    LongsDiffWriter writer = new LongsDiffWriter();
+
+    long curValue = size > 0 ? reader.readNext() : Long.MAX_VALUE;
+
+    // Here we merge freshly added elements and old elements, we also want
+    // to prune removed elements. Both arrays are sorted so in order to merge
+    // them, we move to pointers and store result in the new array
+    while (pTransient < transientData.numberOfAddedElements() ||
+        pCompressed < size) {
+      if (pTransient < transientData.numberOfAddedElements() &&
+          curValue >= transientValues[pTransient]) {
+        writer.writeNext(transientValues[pTransient]);
+        pTransient++;
+      } else {
+        if (!transientData.isRemoved(pCompressed)) {
+          writer.writeNext(curValue);
+        }
+        pCompressed++;
+        if (pCompressed < size) {
+          curValue = reader.readNext();
+        } else {
+          curValue = Long.MAX_VALUE;
+        }
+      }
+    }
+
+    compressedData = writer.toByteArray();
+    size += transientData.size();
+    transientData = null;
+  }
+
+
+  /**
+   * Remove edge at position i.
+   *
+   * @param i Position of edge to be removed
+   */
+  private void removeAt(int i) {
+    checkTransientData();
+    if (i < size) {
+      transientData.markRemoved(i);
+    } else {
+      transientData.removeAddedAt(i - size);
+    }
+  }
+
+  /**
+   * Check if transient data needs to be created.
+   */
+  private void checkTransientData() {
+    if (transientData == null) {
+      transientData = new TransientChanges();
+    }
+  }
+
+  /**
+   * Reset object to completely empty state.
+   */
+  private void reset() {
+    compressedData = ByteArrays.EMPTY_ARRAY;
+    size = 0;
+    transientData = null;
+  }
+
+  /**
+   * Reading array of longs diff encoded from byte array.
+   */
+  private static class LongsDiffReader {
+    /** Input stream */
+    private final DataInput input;
+    /** last read value */
+    private long current;
+    /** True if we haven't read any numbers yet */
+    private boolean first = true;
+
+    /**
+     * Construct LongsDiffReader
+     * @param compressedData Input byte array
+     */
+    LongsDiffReader(byte[] compressedData) {
+      input = new DataInputStream(new ByteArrayInputStream(compressedData));
+    }
+
+    /**
+     * Read next value from reader
+     * @return next value
+     */
+    long readNext() {
+      try {
+        if (first) {
+          current = input.readLong();
+          first = false;
+        } else {
+          current += Varint.readUnsignedVarLong(input);
+        }
+        return current;
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+  }
+
+  /**
+   * Writing array of longs diff encoded into the byte array.
+   */
+  private static class LongsDiffWriter {
+    /** Byte array stream containing result */
+    private final ByteArrayOutputStream resultStream =
+        new ByteArrayOutputStream();
+    /** Wrapping resultStream into DataOutputStream */
+    private final DataOutputStream out = new DataOutputStream(resultStream);
+    /** last value written */
+    private long lastWritten;
+    /** True if we haven't written any numbers yet */
+    private boolean first = true;
+
+    /**
+     * Write next value to writer
+     * @param value Value to be written
+     */
+    void writeNext(long value) {
+      try {
+        if (first) {
+          out.writeLong(value);
+          first = false;
+        } else {
+          Preconditions.checkState(value >= lastWritten,
+              "Values need to be in order");
+          Preconditions.checkState((value - lastWritten) >= 0,
+              "In order to use this class, difference of consecutive IDs " +
+              "cannot overflow longs");
+          Varint.writeUnsignedVarLong(value - lastWritten, out);
+        }
+        lastWritten = value;
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+
+    /**
+     * Get resulting byte array
+     * @return resulting byte array
+     */
+    byte[] toByteArray() {
+      return resultStream.toByteArray();
+    }
+  }
+
+  /**
+   * Temporary storage for all updates.
+   * We don't want to update compressed array frequently so we only update it
+   * on request at the same time we allow temporary updates to persist in this
+   * class.
+   */
+  private static class TransientChanges {
+    /** Neighbors that were added since last flush */
+    private final LongArrayList neighborsAdded;
+    /** Removed indices in original array */
+    private final BitSet removed = new BitSet();
+    /** Number of values removed */
+    private int removedCount;
+
+    /**
+     * Construct transient changes with given capacity
+     * @param capacity capacity
+     */
+    private TransientChanges(int capacity) {
+      neighborsAdded = new LongArrayList(capacity);
+    }
+
+    /**
+     * Construct transient changes
+     */
+    private TransientChanges() {
+      neighborsAdded = new LongArrayList();
+    }
+
+    /**
+     * Add new value
+     * @param value value to add
+     */
+    private void add(long value) {
+      neighborsAdded.add(value);
+    }
+
+    /**
+     * Mark given index to remove
+     * @param index Index to remove
+     */
+    private void markRemoved(int index) {
+      if (!removed.get(index)) {
+        removedCount++;
+        removed.set(index);
+      }
+    }
+
+    /**
+     * Remove value from neighborsAdded
+     * @param index Position to remove from
+     */
+    private void removeAddedAt(int index) {
+      // The order of the edges is irrelevant, so we can simply replace
+      // the deleted edge with the rightmost element, thus achieving constant
+      // time.
+      if (index == neighborsAdded.size() - 1) {
+        neighborsAdded.popLong();
+      } else {
+        neighborsAdded.set(index, neighborsAdded.popLong());
+      }
+    }
+
+    /**
+     * Number of added elements
+     * @return number of added elements
+     */
+    private int numberOfAddedElements() {
+      return neighborsAdded.size();
+    }
+
+    /**
+     * Remove added value
+     * @param target value to remove
+     */
+    private void removeAdded(long target) {
+      neighborsAdded.rem(target);
+    }
+
+    /**
+     * Additional size in transient changes
+     * @return additional size
+     */
+    private int size() {
+      return neighborsAdded.size() - removedCount;
+    }
+
+    /**
+     * Sorted added values
+     * @return sorted added values
+     */
+    private long[] sortedValues() {
+      long[] ret = neighborsAdded.elements();
+      Arrays.sort(ret, 0, neighborsAdded.size());
+      return ret;
+    }
+
+    /**
+     * Check if index was removed
+     * @param i Index to check
+     * @return Whether it was removed
+     */
+    private boolean isRemoved(int i) {
+      return removed.get(i);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationObjectUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationObjectUtils.java
b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationObjectUtils.java
new file mode 100644
index 0000000..ae31bb2
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationObjectUtils.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Utility methods for dealing with Hadoop configuration
+ */
+public class ConfigurationObjectUtils {
+  /** Hide constructor */
+  private ConfigurationObjectUtils() {
+  }
+
+  /**
+   * Encode bytes to a hex String
+   *
+   * @param bytes byte[]
+   * @return encoded String
+   */
+  public static String encodeBytes(byte[] bytes) {
+    StringBuilder strBuf = new StringBuilder();
+    for (int i = 0; i < bytes.length; i++) {
+      strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ('a')));
+      strBuf.append((char) (((bytes[i]) & 0xF) + ('a')));
+    }
+    return strBuf.toString();
+  }
+
+  /**
+   * Decode bytes from a hex String
+   *
+   * @param str String to decode
+   * @return decoded byte[]
+   */
+  public static byte[] decodeBytes(String str) {
+    byte[] bytes = new byte[str.length() / 2];
+    for (int i = 0; i < str.length(); i += 2) {
+      char c = str.charAt(i);
+      bytes[i / 2] = (byte) ((c - 'a') << 4);
+      c = str.charAt(i + 1);
+      bytes[i / 2] += c - 'a';
+    }
+    return bytes;
+  }
+
+  /**
+   * Set byte array to a conf option
+   *
+   * @param data Byte array
+   * @param confOption Conf option
+   * @param conf Configuration
+   */
+  public static void setByteArray(byte[] data, String confOption,
+      Configuration conf) {
+    conf.set(confOption, encodeBytes(data));
+  }
+
+  /**
+   * Get byte array from a conf option
+   *
+   * @param confOption Conf option
+   * @param conf Configuration
+   * @return Byte array
+   */
+  public static byte[] getByteArray(String confOption,
+      Configuration conf) {
+    return decodeBytes(conf.get(confOption));
+  }
+
+  /**
+   * Set object in a conf option using kryo
+   *
+   * @param object Object to set
+   * @param confOption Conf option
+   * @param conf Configuration
+   * @param <T> Type of the object
+   */
+  public static <T> void setObjectKryo(T object, String confOption,
+      Configuration conf) {
+    setByteArray(WritableUtils.toByteArrayUnsafe(
+            new KryoWritableWrapper<>(object)),
+        confOption, conf);
+  }
+
+  /**
+   * Get object from a conf option using kryo
+   *
+   * @param confOption Conf option
+   * @param conf Configuration
+   * @return Object from conf
+   * @param <T> Type of the object
+   */
+  public static <T> T getObjectKryo(String confOption,
+      Configuration conf) {
+    KryoWritableWrapper<T> wrapper = new KryoWritableWrapper<>();
+    WritableUtils.fromByteArrayUnsafe(
+        getByteArray(confOption, conf), wrapper,
+        new UnsafeReusableByteArrayInput());
+    return wrapper.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-core/src/main/java/org/apache/giraph/utils/DefaultOutputCommitter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/DefaultOutputCommitter.java
b/giraph-core/src/main/java/org/apache/giraph/utils/DefaultOutputCommitter.java
new file mode 100644
index 0000000..ef273b0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/DefaultOutputCommitter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Output committer which has abstract commit method
+ */
+public abstract class DefaultOutputCommitter extends OutputCommitter {
+  /**
+   * For cleaning up the job's output after job completion. Note that this
+   * is invoked for jobs with final run state as
+   * {@link org.apache.hadoop.mapreduce.JobStatus.State#SUCCEEDED}
+   *
+   * @param jobContext Context of the job whose output is being written.
+   */
+  public abstract void commit(JobContext jobContext) throws IOException;
+
+  @Override
+  public final void setupJob(JobContext jobContext) throws IOException {
+  }
+
+  @Override
+  public final void setupTask(TaskAttemptContext taskContext)
+      throws IOException {
+  }
+
+  @Override
+  public final void commitJob(JobContext jobContext)
+      throws IOException {
+    super.commitJob(jobContext);
+    commit(jobContext);
+  }
+
+  @Override
+  public final boolean needsTaskCommit(TaskAttemptContext taskContext)
+      throws IOException {
+    // Digraph does not require a task commit and there is a bug in Corona
+    // which triggers t5688706
+    // Avoiding the task commit should work around this.
+    return false;
+  }
+
+  @Override
+  public final void commitTask(TaskAttemptContext context) throws IOException {
+  }
+
+  @Override
+  public final void abortTask(TaskAttemptContext taskContext)
+      throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index 68ed89a..2229c2f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -34,6 +34,7 @@ import java.util.List;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.factories.ValueFactory;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.zk.ZooKeeperExt;
 import org.apache.giraph.zk.ZooKeeperExt.PathStat;
@@ -950,4 +951,190 @@ public class WritableUtils {
     }
     return copy;
   }
+
+  /**
+   * Create a copy of Writable object, by serializing and deserializing it.
+   *
+   * @param original Original value of which to make a copy
+   * @return Copy of the original value
+   * @param <T> Type of the object
+   */
+  public static final <T extends Writable> T createCopy(T original) {
+    return (T) createCopy(original, original.getClass(), null);
+  }
+
+  /**
+   * Create a copy of Writable object, by serializing and deserializing it.
+   *
+   * @param original Original value of which to make a copy
+   * @param outputClass Expected copy class, needs to match original
+   * @param conf Configuration
+   * @return Copy of the original value
+   * @param <T> Type of the object
+   */
+  public static final <T extends Writable>
+  T createCopy(T original, Class<? extends T> outputClass,
+      ImmutableClassesGiraphConfiguration conf) {
+    T result = WritableUtils.createWritable(outputClass, conf);
+    copyInto(original, result);
+    return result;
+  }
+
+  /**
+   * Create a copy of Writable object, by serializing and deserializing it.
+   *
+   * @param original Original value of which to make a copy
+   * @param classFactory Factory to create new empty object from
+   * @param conf Configuration
+   * @return Copy of the original value
+   * @param <T> Type of the object
+   */
+  public static final <T extends Writable>
+  T createCopy(T original, ValueFactory<T> classFactory,
+      ImmutableClassesGiraphConfiguration conf) {
+    T result = classFactory.newInstance();
+    copyInto(original, result);
+    return result;
+  }
+
+  /**
+   * Serialize given writable object, and return it's size.
+   *
+   * @param w Writable object
+   * @return it's size after serialization
+   */
+  public static int size(Writable w) {
+    try {
+      ExtendedByteArrayDataOutput out = new ExtendedByteArrayDataOutput();
+      w.write(out);
+      return out.getPos();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Serialize given writable to byte array,
+   * using new instance of ExtendedByteArrayDataOutput.
+   *
+   * @param w Writable object
+   * @return array of bytes
+   * @param <T> Type of the object
+   */
+  public static <T extends Writable> byte[] toByteArray(T w) {
+    try {
+      ExtendedByteArrayDataOutput out = new ExtendedByteArrayDataOutput();
+      w.write(out);
+      return out.toByteArray();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Deserialize from given byte array into given writable,
+   * using new instance of ExtendedByteArrayDataInput.
+   *
+   * @param data Byte array representing writable
+   * @param to Object to fill
+   * @param <T> Type of the object
+   */
+  public static <T extends Writable> void fromByteArray(byte[] data, T to) {
+    try {
+      ExtendedByteArrayDataInput in =
+          new ExtendedByteArrayDataInput(data, 0, data.length);
+      to.readFields(in);
+
+      if (in.available() != 0) {
+        throw new RuntimeException(
+            "Serialization encountered issues, " + in.available() +
+            " bytes left to be read");
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Serialize given writable to byte array,
+   * using new instance of UnsafeByteArrayOutputStream.
+   *
+   * @param w Writable object
+   * @return array of bytes
+   * @param <T> Type of the object
+   */
+  public static <T extends Writable> byte[] toByteArrayUnsafe(T w) {
+    try {
+      UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
+      w.write(out);
+      return out.toByteArray();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Deserialize from given byte array into given writable,
+   * using given reusable UnsafeReusableByteArrayInput.
+   *
+   * @param data Byte array representing writable
+   * @param to Object to fill
+   * @param reusableInput Reusable input to use
+   * @param <T> Type of the object
+   */
+  public static <T extends Writable> void fromByteArrayUnsafe(
+      byte[] data, T to, UnsafeReusableByteArrayInput reusableInput) {
+    try {
+      reusableInput.initialize(data, 0, data.length);
+      to.readFields(reusableInput);
+
+      if (reusableInput.available() != 0) {
+        throw new RuntimeException(
+            "Serialization encountered issues, " + reusableInput.available() +
+            " bytes left to be read");
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * First write a boolean saying whether an object is not null,
+   * and if it's not write the object
+   *
+   * @param object Object to write
+   * @param out DataOutput to write to
+   * @param <T> Object type
+   */
+  public static <T extends Writable> void writeIfNotNullAndObject(T object,
+      DataOutput out) throws IOException {
+    out.writeBoolean(object != null);
+    if (object != null) {
+      object.write(out);
+    }
+  }
+
+  /**
+   * First read a boolean saying whether an object is not null,
+   * and if it's not read the object
+   *
+   * @param reusableObject Reuse this object instance
+   * @param objectClass Class of the object, to create if reusableObject is null
+   * @param in DataInput to read from
+   * @param <T> Object type
+   * @return Object, or null
+   */
+  public static <T extends Writable> T readIfNotNullAndObject(T reusableObject,
+      Class<T> objectClass, DataInput in) throws IOException {
+    if (in.readBoolean()) {
+      if (reusableObject == null) {
+        reusableObject = ReflectionUtils.newInstance(objectClass);
+      }
+      reusableObject.readFields(in);
+      return reusableObject;
+    } else {
+      return null;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-core/src/test/java/org/apache/giraph/edge/LongDiffNullArrayEdgesTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/edge/LongDiffNullArrayEdgesTest.java
b/giraph-core/src/test/java/org/apache/giraph/edge/LongDiffNullArrayEdgesTest.java
new file mode 100644
index 0000000..e23f592
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/edge/LongDiffNullArrayEdgesTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.edge;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.UnsafeByteArrayInputStream;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+
+public class LongDiffNullArrayEdgesTest {
+  private static Edge<LongWritable, NullWritable> createEdge(long id) {
+    return EdgeFactory.create(new LongWritable(id));
+  }
+
+  private static void assertEdges(LongDiffNullArrayEdges edges, long... expected) {
+    int index = 0;
+    for (Edge<LongWritable, NullWritable> edge : edges) {
+      Assert.assertEquals(expected[index], edge.getTargetVertexId().get());
+      index++;
+    }
+    Assert.assertEquals(expected.length, index);
+  }
+
+  @Test
+  public void testEdges() {
+    LongDiffNullArrayEdges edges = getEdges();
+
+    List<Edge<LongWritable, NullWritable>> initialEdges = Lists.newArrayList(
+        createEdge(1), createEdge(2), createEdge(4));
+
+    edges.initialize(initialEdges);
+    assertEdges(edges, 1, 2, 4);
+
+    edges.add(EdgeFactory.createReusable(new LongWritable(3)));
+    assertEdges(edges, 1, 2, 3, 4);
+
+    edges.remove(new LongWritable(2));
+    assertEdges(edges, 1, 3, 4);
+  }
+
+  @Test
+  public void testPositiveAndNegativeEdges() {
+    LongDiffNullArrayEdges edges = getEdges();
+
+    List<Edge<LongWritable, NullWritable>> initialEdges = Lists.newArrayList(
+        createEdge(1), createEdge(-2), createEdge(3), createEdge(-4));
+
+    edges.initialize(initialEdges);
+    assertEdges(edges, -4, -2, 1, 3);
+
+    edges.add(EdgeFactory.createReusable(new LongWritable(5)));
+    assertEdges(edges, -4, -2, 1, 3, 5);
+
+    edges.remove(new LongWritable(-2));
+    assertEdges(edges, -4, 1, 3, 5);
+  }
+
+  @Test
+  public void testMutateEdges() {
+    LongDiffNullArrayEdges edges = getEdges();
+
+    edges.initialize();
+
+    // Add 10 edges with id i, for i = 0..9
+    for (int i = 0; i < 10; ++i) {
+      edges.add(createEdge(i));
+    }
+
+    // Use the mutable iterator to remove edges with even id
+    Iterator<MutableEdge<LongWritable, NullWritable>> edgeIt =
+        edges.mutableIterator();
+    while (edgeIt.hasNext()) {
+      if (edgeIt.next().getTargetVertexId().get() % 2 == 0) {
+        edgeIt.remove();
+      }
+    }
+
+    // We should now have 5 edges
+    assertEquals(5, edges.size());
+    // The edge ids should be all odd
+    for (Edge<LongWritable, NullWritable> edge : edges) {
+      assertEquals(1, edge.getTargetVertexId().get() % 2);
+    }
+  }
+
+  @Test
+  public void testSerialization() throws IOException {
+    LongDiffNullArrayEdges edges = getEdges();
+
+    edges.initialize();
+
+    // Add 10 edges with id i, for i = 0..9
+    for (int i = 0; i < 10; ++i) {
+      edges.add(createEdge(i));
+    }
+
+    edges.trim();
+
+    // Use the mutable iterator to remove edges with even id
+    Iterator<MutableEdge<LongWritable, NullWritable>> edgeIt =
+        edges.mutableIterator();
+    while (edgeIt.hasNext()) {
+      if (edgeIt.next().getTargetVertexId().get() % 2 == 0) {
+        edgeIt.remove();
+      }
+    }
+
+    // We should now have 5 edges
+    assertEdges(edges, 1, 3, 5, 7, 9);
+
+    ByteArrayOutputStream arrayStream = new ByteArrayOutputStream();
+    DataOutputStream tempBuffer = new DataOutputStream(arrayStream);
+
+    edges.write(tempBuffer);
+
+    byte[] binary = arrayStream.toByteArray();
+
+    assertTrue("Serialized version should not be empty ", binary.length > 0);
+
+    edges = getEdges();
+    edges.readFields(new UnsafeByteArrayInputStream(binary));
+
+    assertEquals(5, edges.size());
+
+    for (Edge<LongWritable, NullWritable> edge : edges) {
+      assertEquals(1, edge.getTargetVertexId().get() % 2);
+    }
+  }
+
+  @Test
+  public void testParallelEdges() {
+    LongDiffNullArrayEdges edges = getEdges();
+
+    List<Edge<LongWritable, NullWritable>> initialEdges = Lists.newArrayList(
+        createEdge(2), createEdge(2), createEdge(2));
+
+    edges.initialize(initialEdges);
+    assertEquals(3, edges.size());
+
+    edges.remove(new LongWritable(2));
+    assertEquals(0, edges.size());
+
+    edges.add(EdgeFactory.create(new LongWritable(2)));
+    assertEquals(1, edges.size());
+
+    edges.trim();
+    assertEquals(1, edges.size());
+  }
+
+  @Test
+  public void testEdgeValues() {
+    LongDiffNullArrayEdges edges = getEdges();
+    Set<Long> testValues = new HashSet<Long>();
+    testValues.add(0L);
+    testValues.add((long) Integer.MAX_VALUE);
+    testValues.add(Long.MAX_VALUE);
+
+    // shouldn't be working with negative IDs
+    // testValues.add((long) Integer.MIN_VALUE);
+    // testValues.add(Long.MIN_VALUE);
+
+    List<Edge<LongWritable, NullWritable>> initialEdges =
+        new ArrayList<Edge<LongWritable, NullWritable>>();
+    for(Long id : testValues) {
+      initialEdges.add(createEdge(id));
+    }
+
+    edges.initialize(initialEdges);
+    edges.trim();
+
+    Iterator<MutableEdge<LongWritable, NullWritable>> edgeIt =
+        edges.mutableIterator();
+    while (edgeIt.hasNext()) {
+      long value = edgeIt.next().getTargetVertexId().get();
+      assertTrue("Unknown edge found " + value, testValues.remove(value));
+    }
+  }
+
+  private LongDiffNullArrayEdges getEdges() {
+    GiraphConfiguration gc = new GiraphConfiguration();
+    ImmutableClassesGiraphConfiguration<LongWritable, Writable, NullWritable> conf
=
+        new ImmutableClassesGiraphConfiguration<LongWritable, Writable, NullWritable>(gc);
+    LongDiffNullArrayEdges ret = new LongDiffNullArrayEdges();
+    ret.setConf(new ImmutableClassesGiraphConfiguration<LongWritable, Writable, NullWritable>(conf));
+    return ret;
+  }
+
+  @Test
+  public void testAddedSmalerValues() {
+    LongDiffNullArrayEdges edges = getEdges();
+
+    List<Edge<LongWritable, NullWritable>> initialEdges = Lists.newArrayList(
+        createEdge(100));
+
+    edges.initialize(initialEdges);
+
+    edges.trim();
+
+    for (int i=0; i<16; i++) {
+      edges.add(createEdge(i));
+    }
+
+    edges.trim();
+
+    assertEquals(17, edges.size());
+  }
+
+  @Test(expected=IllegalStateException.class)
+  public void testFailSafeOnPotentialOverflow() {
+    LongDiffNullArrayEdges edges = getEdges();
+
+    List<Edge<LongWritable, NullWritable>> initialEdges = Lists.newArrayList(
+        createEdge(5223372036854775807L), createEdge(-4223372036854775807L));
+    edges.initialize(initialEdges);
+  }
+
+  @Test
+  public void testAvoidOverflowWithZero() {
+    LongDiffNullArrayEdges edges = getEdges();
+
+    List<Edge<LongWritable, NullWritable>> initialEdges = Lists.newArrayList(
+        createEdge(5223372036854775807L), createEdge(-4223372036854775807L), createEdge(0));
+    edges.initialize(initialEdges);
+    assertEdges(edges, -4223372036854775807L, 0, 5223372036854775807L);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 63a9bae..4198f13 100644
--- a/pom.xml
+++ b/pom.xml
@@ -275,6 +275,8 @@ under the License.
     <!-- This lets modules skip unit tests. More details: GIRAPH-957 --> 
     <surefire.skip>false</surefire.skip>
 
+    <checkstyle.config.path>${top.dir}/checkstyle.xml</checkstyle.config.path>
+
     <dep.avro.version>1.7.6</dep.avro.version>
     <dep.accumulo.version>1.4.0</dep.accumulo.version>
     <dep.asm.version>3.2</dep.asm.version>
@@ -541,7 +543,7 @@ under the License.
           <artifactId>maven-checkstyle-plugin</artifactId>
           <version>2.15</version>
           <configuration>
-            <configLocation>${top.dir}/checkstyle.xml</configLocation>
+            <configLocation>${checkstyle.config.path}</configLocation>
             <consoleOutput>true</consoleOutput>
             <enableRulesSummary>false</enableRulesSummary>
             <headerLocation>${top.dir}/license-header.txt</headerLocation>
@@ -837,7 +839,7 @@ under the License.
               <artifactId>maven-checkstyle-plugin</artifactId>
               <version>2.10</version>
               <configuration>
-                <configLocation>${top.dir}/checkstyle.xml</configLocation>
+                <configLocation>${checkstyle.config.path}</configLocation>
                 <consoleOutput>true</consoleOutput>
                 <enableRulesSummary>false</enableRulesSummary>
                 <headerLocation>${top.dir}/license-header.txt</headerLocation>
@@ -1713,6 +1715,11 @@ under the License.
       </dependency>
       <dependency>
         <groupId>org.apache.giraph</groupId>
+        <artifactId>giraph-block-app</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.giraph</groupId>
         <artifactId>giraph-examples</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -2276,6 +2283,7 @@ under the License.
 
   <modules>
     <module>giraph-core</module>
+    <module>giraph-block-app</module>
     <module>giraph-examples</module>
   </modules>
 


Mime
View raw message