giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clau...@apache.org
Subject [1/2] git commit: updated refs/heads/trunk to f7c3025
Date Mon, 18 Nov 2013 15:00:25 GMT
Updated Branches:
  refs/heads/trunk e987492ee -> f7c302587


GIRAPH-760


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b151d7a9
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b151d7a9
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b151d7a9

Branch: refs/heads/trunk
Commit: b151d7a97f8f06451a915668e01f393ad0445b36
Parents: e987492
Author: Claudio Martella <claudio.martella@gmail.com>
Authored: Mon Nov 18 15:59:15 2013 +0100
Committer: Claudio Martella <claudio.martella@gmail.com>
Committed: Mon Nov 18 15:59:15 2013 +0100

----------------------------------------------------------------------
 .../giraph/io/gora/GoraEdgeOutputFormat.java    | 281 +++++++++++++++++
 .../io/gora/GoraGEdgeEdgeOutputFormat.java      |  76 +++++
 .../giraph/io/gora/generated/GEdgeResult.java   | 314 +++++++++++++++++++
 .../io/gora/GoraTestEdgeOutputFormat.java       | 119 +++++++
 .../io/gora/TestGoraEdgeOutputFormat.java       |  93 ++++++
 5 files changed, 883 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/b151d7a9/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeOutputFormat.java
b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeOutputFormat.java
new file mode 100644
index 0000000..be9f472
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeOutputFormat.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_DATASTORE_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_KEY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.EdgeOutputFormat;
+import org.apache.giraph.io.EdgeWriter;
+import org.apache.giraph.io.gora.utils.GoraUtils;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.GoraException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.log4j.Logger;
+
+/**
+ *  Class which wraps the GoraInputFormat. It's designed
+ *  as an extension point to EdgeOutputFormat subclasses who wish
+ *  to write to Gora data sources.
+ *
+ *  Works with
+ *  {@link GoraEdgeInputFormat}
+ *
+ * @param <I> edge id type
+ * @param <V>  vertex type
+ * @param <E>  edge type
+ */
+public abstract class GoraEdgeOutputFormat<I extends WritableComparable,
+  V extends Writable, E extends Writable>
+  extends EdgeOutputFormat<I, V, E> {
+
+  /** Logger for Gora's vertex input format. */
+  private static final Logger LOG =
+          Logger.getLogger(GoraEdgeOutputFormat.class);
+
+  /** KeyClass used for getting data. */
+  private static Class<?> KEY_CLASS;
+
+  /** The vertex itself will be used as a value inside Gora. */
+  private static Class<? extends Persistent> PERSISTENT_CLASS;
+
+  /** Data store class to be used as backend. */
+  private static Class<? extends DataStore> DATASTORE_CLASS;
+
+  /** Data store used for querying data. */
+  private static DataStore DATA_STORE;
+
+  /**
+   * checkOutputSpecs
+   *
+   * @param context information about the job
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+  }
+
+  /**
+   * Gets the data store object initialized.
+   * @return DataStore created
+   */
+  public DataStore createDataStore() {
+    DataStore dsCreated = null;
+    try {
+      dsCreated = GoraUtils.createSpecificDataStore(getDatastoreClass(),
+          getKeyClass(), getPersistentClass());
+    } catch (GoraException e) {
+      getLogger().error("Error creating data store.");
+      e.printStackTrace();
+    }
+    return dsCreated;
+  }
+
+  @Override
+  public abstract GoraEdgeWriter
+  createEdgeWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException;
+
+  /**
+   * getOutputCommitter
+   *
+   * @param context the task context
+   * @return OutputCommitter
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return new NullOutputCommitter();
+  }
+
+  /**
+   * Empty output commiter for hadoop.
+   */
+  private static class NullOutputCommitter extends OutputCommitter {
+    @Override
+    public void abortTask(TaskAttemptContext arg0) throws IOException {    }
+
+    @Override
+    public void commitTask(TaskAttemptContext arg0) throws IOException {    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
+      return false;
+    }
+
+    @Override
+    public void setupJob(JobContext arg0) throws IOException {    }
+
+    @Override
+    public void setupTask(TaskAttemptContext arg0) throws IOException {    }
+  }
+
+  /**
+   * Abstract class to be implemented by the user based on their specific
+   * vertex/edges output.
+   */
+  protected abstract class GoraEdgeWriter extends EdgeWriter<I, V, E> {
+    @Override
+    public void initialize(TaskAttemptContext context) throws IOException,
+      InterruptedException {
+      String sDataStoreType =
+          GIRAPH_GORA_OUTPUT_DATASTORE_CLASS.get(getConf());
+      String sKeyType =
+          GIRAPH_GORA_OUTPUT_KEY_CLASS.get(getConf());
+      String sPersistentType =
+          GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS.get(getConf());
+      try {
+        Class<?> keyClass = Class.forName(sKeyType);
+        Class<?> persistentClass = Class.forName(sPersistentType);
+        Class<?> dataStoreClass = Class.forName(sDataStoreType);
+        setKeyClass(keyClass);
+        setPersistentClass((Class<? extends Persistent>) persistentClass);
+        setDatastoreClass((Class<? extends DataStore>) dataStoreClass);
+        setDataStore(createDataStore());
+        if (getDataStore() != null) {
+          getLogger().debug("The data store has been created.");
+        }
+      } catch (ClassNotFoundException e) {
+        getLogger().error("Error while reading Gora Output parameters");
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void close(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      getDataStore().flush();
+      getDataStore().close();
+    }
+
+    @Override
+    public void writeEdge(I srcId, V srcValue, Edge<I, E> edge)
+      throws IOException, InterruptedException {
+      Persistent goraEdge = null;
+      Object goraKey = getGoraKey(srcId, srcValue, edge);
+      goraEdge = getGoraEdge(srcId, srcValue, edge);
+      getDataStore().put(goraKey, goraEdge);
+    }
+
+    /**
+     * Each edge needs to be transformed into a Gora object to be sent to
+     * a specific data store.
+     *
+     * @param  edge   edge to be transformed into a Gora object
+     * @param  srcId  source vertex id
+     * @param  srcValue  source vertex value
+     * @return          Gora representation of the vertex
+     */
+    protected abstract Persistent getGoraEdge
+      (I srcId, V srcValue, Edge<I, E> edge);
+
+    /**
+     * Gets the correct key from a computed vertex.
+     * @param edge  edge to extract the key from.
+     * @param  srcId  source vertex id
+     * @param  srcValue  source vertex value
+     * @return      The key representing such edge.
+     */
+    protected abstract Object getGoraKey(I srcId, V srcValue, Edge<I, E> edge);
+  }
+
+  /**
+   * Gets the data store.
+   * @return DataStore
+   */
+  public static DataStore getDataStore() {
+    return DATA_STORE;
+  }
+
+  /**
+   * Sets the data store
+   * @param dStore the dATA_STORE to set
+   */
+  public static void setDataStore(DataStore dStore) {
+    DATA_STORE = dStore;
+  }
+
+  /**
+   * Gets the persistent Class
+   * @return persistentClass used
+   */
+  static Class<? extends Persistent> getPersistentClass() {
+    return PERSISTENT_CLASS;
+  }
+
+  /**
+   * Sets the persistent Class
+   * @param persistentClassUsed to be set
+   */
+  static void setPersistentClass
+  (Class<? extends Persistent> persistentClassUsed) {
+    PERSISTENT_CLASS = persistentClassUsed;
+  }
+
+  /**
+   * Gets the key class used.
+   * @return the key class used.
+   */
+  static Class<?> getKeyClass() {
+    return KEY_CLASS;
+  }
+
+  /**
+   * Sets the key class used.
+   * @param keyClassUsed key class used.
+   */
+  static void setKeyClass(Class<?> keyClassUsed) {
+    KEY_CLASS = keyClassUsed;
+  }
+
+  /**
+   * @return Class the DATASTORE_CLASS
+   */
+  public static Class<? extends DataStore> getDatastoreClass() {
+    return DATASTORE_CLASS;
+  }
+
+  /**
+   * @param dataStoreClass the dataStore class to set
+   */
+  public static void setDatastoreClass(
+      Class<? extends DataStore> dataStoreClass) {
+    DATASTORE_CLASS = dataStoreClass;
+  }
+
+  /**
+   * Gets the logger for the class.
+   * @return the log of the class.
+   */
+  public static Logger getLogger() {
+    return LOG;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b151d7a9/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeOutputFormat.java
b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeOutputFormat.java
new file mode 100644
index 0000000..d350d37
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeOutputFormat.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import java.io.IOException;
+
+import org.apache.avro.util.Utf8;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.gora.generated.GEdgeResult;
+import org.apache.gora.persistency.Persistent;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Implementation of a specific writer for a generated data bean.
+ */
+public class GoraGEdgeEdgeOutputFormat
+  extends GoraEdgeOutputFormat<LongWritable, DoubleWritable,
+  FloatWritable> {
+
+  /**
+   * Default constructor
+   */
+  public GoraGEdgeEdgeOutputFormat() {
+  }
+
+  @Override
+  public GoraEdgeWriter createEdgeWriter(
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    return new GoraGEdgeEdgeWriter();
+  }
+
+  /**
+   * Gora edge writer.
+   */
+  protected class GoraGEdgeEdgeWriter
+    extends GoraEdgeWriter {
+
+    @Override
+    protected Persistent getGoraEdge(LongWritable srcId,
+        DoubleWritable srcValue, Edge<LongWritable, FloatWritable> edge) {
+      GEdgeResult tmpGEdge = new GEdgeResult();
+      tmpGEdge.setEdgeId(new Utf8(srcId.toString()));
+      tmpGEdge.setEdgeWeight(edge.getValue().get());
+      tmpGEdge.setVertexOutId(new Utf8(edge.getTargetVertexId().toString()));
+      getLogger().debug("GoraObject created: " + tmpGEdge.toString());
+      return tmpGEdge;
+    }
+
+    @Override
+    protected Object getGoraKey(LongWritable srcId,
+        DoubleWritable srcValue, Edge<LongWritable, FloatWritable> edge) {
+      String goraKey = String.valueOf(
+          edge.getTargetVertexId().get() + edge.getValue().get());
+      return goraKey;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b151d7a9/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdgeResult.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdgeResult.java
b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdgeResult.java
new file mode 100644
index 0000000..0c3501c
--- /dev/null
+++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdgeResult.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.gora.generated;
+
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.StateManager;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.persistency.impl.StateManagerImpl;
+
+/**
+ * Example Class for defining a Giraph-Edge.
+ */
+@SuppressWarnings("all")
+public class GEdgeResult extends PersistentBase {
+  /**
+   * Schema used for the class.
+   */
+  public static final Schema OBJ_SCHEMA = Schema.parse("{\"type\":\"record\"," +
+    "\"name\":\"GEdge\",\"namespace\":\"org.apache.giraph.gora.generated\"," +
+    "\"fields\":[{\"name\":\"edgeId\",\"type\":\"string\"}," +
+    "{\"name\":\"edgeWeight\",\"type\":\"float\"}," +
+    "{\"name\":\"vertexInId\",\"type\":\"string\"}," +
+    "{\"name\":\"vertexOutId\",\"type\":\"string\"}," +
+    "{\"name\":\"label\",\"type\":\"string\"}]}");
+
+  /**
+   * Field enum
+   */
+  public static enum Field {
+    /**
+     * Edge id.
+     */
+    EDGE_ID(0, "edgeId"),
+
+    /**
+     * Edge weight.
+     */
+    EDGE_WEIGHT(1, "edgeWeight"),
+
+    /**
+     * Edge vertex source id.
+     */
+    VERTEX_IN_ID(2, "vertexInId"),
+
+    /**
+     * Edge vertex end id.
+     */
+    VERTEX_OUT_ID(3, "vertexOutId"),
+
+    /**
+     * Edge label.
+     */
+    LABEL(4, "label");
+
+    /**
+     * Field index
+     */
+    private int index;
+
+    /**
+     * Field name
+     */
+    private String name;
+
+    /**
+     * Field constructor
+     * @param index of attribute
+     * @param name of attribute
+     */
+    Field(int index, String name) {
+      this.index = index;
+      this.name = name;
+    }
+
+    /**
+     * Gets index
+     * @return int of attribute.
+     */
+    public int getIndex() {
+      return index;
+    }
+
+    /**
+     * Gets name
+     * @return String of name.
+     */
+    public String getName() {
+      return name;
+    }
+
+    /**
+     * Gets name
+     * @return String of name.
+     */
+    public String toString() {
+      return name;
+    }
+  };
+
+  /**
+   * Array containing all fields/
+   */
+  private static final String[] ALL_FIELDS = {
+    "edgeId", "edgeWeight", "vertexInId", "vertexOutId", "label"
+  };
+
+  static {
+    PersistentBase.registerFields(GEdgeResult.class, ALL_FIELDS);
+  }
+
+  /**
+   * edgeId
+   */
+  private Utf8 edgeId;
+
+  /**
+   * edgeWeight
+   */
+  private float edgeWeight;
+
+  /**
+   * vertexInId
+   */
+  private Utf8 vertexInId;
+
+  /**
+   * vertexOutId
+   */
+  private Utf8 vertexOutId;
+
+  /**
+   * label
+   */
+  private Utf8 label;
+
+  /**
+   * Default constructor.
+   */
+  public GEdgeResult() {
+    this(new StateManagerImpl());
+  }
+
+  /**
+   * Constructor
+   * @param stateManager from which the object will be created.
+   */
+  public GEdgeResult(StateManager stateManager) {
+    super(stateManager);
+  }
+
+  /**
+   * Creates a new instance
+   * @param stateManager from which the object will be created.
+   * @return GEdge created
+   */
+  public GEdgeResult newInstance(StateManager stateManager) {
+    return new GEdgeResult(stateManager);
+  }
+
+  /**
+   * Gets the object schema
+   * @return Schema of the object.
+   */
+  public Schema getSchema() {
+    return OBJ_SCHEMA;
+  }
+
+  /**
+   * Gets field
+   * @param fieldIndex index field.
+   * @return Object from an index.
+   */
+  public Object get(int fieldIndex) {
+    switch (fieldIndex) {
+    case 0:
+      return edgeId;
+    case 1:
+      return edgeWeight;
+    case 2:
+      return vertexInId;
+    case 3:
+      return vertexOutId;
+    case 4:
+      return label;
+    default:
+      throw new AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Puts a value into a field.
+   * @param fieldIndex index of field used.
+   * @param fieldValue value of field used.
+   */
+  @SuppressWarnings(value = "unchecked")
+  public void put(int fieldIndex, Object fieldValue) {
+    if (isFieldEqual(fieldIndex, fieldValue)) {
+      return;
+    }
+    getStateManager().setDirty(this, fieldIndex);
+    switch (fieldIndex) {
+    case 0:
+      edgeId = (Utf8) fieldValue; break;
+    case 1:
+      edgeWeight = (Float) fieldValue; break;
+    case 2:
+      vertexInId = (Utf8) fieldValue; break;
+    case 3:
+      vertexOutId = (Utf8) fieldValue; break;
+    case 4:
+      label = (Utf8) fieldValue; break;
+    default:
+      throw new AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets edgeId
+   * @return Utf8 edgeId
+   */
+  public Utf8 getEdgeId() {
+    return (Utf8) get(0);
+  }
+
+  /**
+   * Sets edgeId
+   * @param value edgeId
+   */
+  public void setEdgeId(Utf8 value) {
+    put(0, value);
+  }
+
+  /**
+   * Gets edgeWeight
+   * @return float edgeWeight
+   */
+  public float getEdgeWeight() {
+    return (Float) get(1);
+  }
+
+  /**
+   * Sets edgeWeight
+   * @param value edgeWeight
+   */
+  public void setEdgeWeight(float value) {
+    put(1, value);
+  }
+
+  /**
+   * Gets edgeVertexInId
+   * @return Utf8 edgeVertexInId
+   */
+  public Utf8 getVertexInId() {
+    return (Utf8) get(2);
+  }
+
+  /**
+   * Sets edgeVertexInId
+   * @param value edgeVertexInId
+   */
+  public void setVertexInId(Utf8 value) {
+    put(2, value);
+  }
+
+  /**
+   * Gets edgeVertexOutId
+   * @return Utf8 edgeVertexOutId
+   */
+  public Utf8 getVertexOutId() {
+    return (Utf8) get(3);
+  }
+
+  /**
+   * Sets edgeVertexOutId
+   * @param value edgeVertexOutId
+   */
+  public void setVertexOutId(Utf8 value) {
+    put(3, value);
+  }
+
+  /**
+   * Gets edgeLabel
+   * @return Utf8 edgeLabel
+   */
+  public Utf8 getLabel() {
+    return (Utf8) get(4);
+  }
+
+  /**
+   * Sets edgeLabel
+   * @param value edgeLabel
+   */
+  public void setLabel(Utf8 value) {
+    put(4, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b151d7a9/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeOutputFormat.java
b/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeOutputFormat.java
new file mode 100644
index 0000000..0254498
--- /dev/null
+++ b/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeOutputFormat.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.avro.util.Utf8;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.io.gora.GoraEdgeOutputFormat;
+import org.apache.giraph.io.gora.generated.GEdge;
+import org.apache.giraph.io.gora.generated.GEdgeResult;
+import org.apache.gora.persistency.Persistent;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Implementation of a specific writer for a generated data bean.
+ */
+public class GoraTestEdgeOutputFormat
+  extends GoraEdgeOutputFormat<LongWritable, DoubleWritable,
+  FloatWritable> {
+
+  /**
+   * Default constructor
+   */
+  public GoraTestEdgeOutputFormat() {
+  }
+
+  @Override
+  public GoraEdgeWriter createEdgeWriter(
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    return new GoraGEdgeEdgeWriter();
+  }
+
+  /**
+   * Gora edge writer.
+   */
+  protected class GoraGEdgeEdgeWriter
+    extends GoraEdgeWriter {
+
+    @Override
+    protected Persistent getGoraEdge(LongWritable srcId,
+        DoubleWritable srcValue, Edge<LongWritable, FloatWritable> edge) {
+      GEdgeResult tmpGEdge = new GEdgeResult();
+      Utf8 keyLabel = new Utf8(srcId.toString() + "-" +
+      edge.getTargetVertexId().toString());
+      tmpGEdge.setEdgeId(keyLabel);
+      tmpGEdge.setEdgeWeight(edge.getValue().get());
+      tmpGEdge.setVertexInId(new Utf8(srcId.toString()));
+      tmpGEdge.setVertexOutId(new Utf8(edge.getTargetVertexId().toString()));
+      tmpGEdge.setLabel(keyLabel);
+      getLogger().debug("GoraObject created: " + tmpGEdge.toString());
+      return tmpGEdge;
+    }
+
+    @Override
+    protected Object getGoraKey(LongWritable srcId,
+        DoubleWritable srcValue, Edge<LongWritable, FloatWritable> edge) {
+      String goraKey = String.valueOf(
+          edge.getTargetVertexId().get() + edge.getValue().get());
+      return goraKey;
+    }
+
+    @Override
+    public void writeEdge(LongWritable srcId, DoubleWritable srcValue,
+        Edge<LongWritable, FloatWritable> edge)
+        throws IOException, InterruptedException {
+      super.writeEdge(srcId, srcValue, edge);
+      Object goraKey = getGoraKey(srcId, srcValue, edge);
+      String keyLabel = String.valueOf(srcId) + "-" +
+          String.valueOf(edge.getTargetVertexId());
+      float weight = Float.valueOf(srcId.toString()) +
+          Float.valueOf(edge.getTargetVertexId().toString());
+      // Asserting
+      Assert.assertEquals(createEdge(keyLabel, String.valueOf(srcId),
+              String.valueOf(edge.getTargetVertexId()),keyLabel, weight),
+              getDataStore().get(goraKey));
+    }
+
+    /**
+     * Creates an edge using an id and a set of edges.
+     * @param id Vertex id.
+     * @param vertexInId Vertex source Id.
+     * @param vertexOutId Vertex destination Id.
+     * @param edgeLabel Edge label.
+     * @param edgeWeight Edge wight.
+     * @return GEdge created.
+     */
+    private GEdge createEdge(String id, String vertexInId,
+        String vertexOutId, String edgeLabel, float edgeWeight) {
+      GEdge newEdge = new GEdge();
+      newEdge.setEdgeId(new Utf8(id));
+      newEdge.setVertexInId(new Utf8(vertexInId));
+      newEdge.setVertexOutId(new Utf8(vertexOutId));
+      newEdge.setLabel(new Utf8(edgeLabel));
+      newEdge.setEdgeWeight(edgeWeight);
+      return newEdge;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b151d7a9/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeOutputFormat.java
b/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeOutputFormat.java
new file mode 100644
index 0000000..c9ac38a
--- /dev/null
+++ b/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeOutputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.gora;
+
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_DATASTORE_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_KEY_CLASS;
+import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS;
+
+import java.io.IOException;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test class for Gora edge output formats.
+ */
+public class TestGoraEdgeOutputFormat {
+
+  @Test
+  public void getWritingDb() throws Exception {
+    Iterable<String>    results;
+    GiraphConfiguration conf    = new GiraphConfiguration();
+    // Parameters for input
+    GIRAPH_GORA_DATASTORE_CLASS.
+    set(conf, "org.apache.gora.memory.store.MemStore");
+    GIRAPH_GORA_KEYS_FACTORY_CLASS.
+    set(conf,"org.apache.giraph.io.gora.utils.DefaultKeyFactory");
+    GIRAPH_GORA_KEY_CLASS.set(conf,"java.lang.String");
+    GIRAPH_GORA_PERSISTENT_CLASS.
+    set(conf,"org.apache.giraph.io.gora.generated.GEdge");
+    GIRAPH_GORA_START_KEY.set(conf,"1");
+    GIRAPH_GORA_END_KEY.set(conf,"4");
+    conf.set("io.serializations",
+        "org.apache.hadoop.io.serializer.WritableSerialization," +
+        "org.apache.hadoop.io.serializer.JavaSerialization");
+    conf.setComputationClass(EmptyComputation.class);
+    conf.setEdgeInputFormatClass(GoraTestEdgeInputFormat.class);
+    // Parameters for output
+    GIRAPH_GORA_OUTPUT_DATASTORE_CLASS.
+    set(conf, "org.apache.gora.memory.store.MemStore");
+    GIRAPH_GORA_OUTPUT_KEY_CLASS.set(conf, "java.lang.String");
+    GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS.
+    set(conf,"org.apache.giraph.io.gora.generated.GEdge");
+    conf.setEdgeOutputFormatClass(GoraTestEdgeOutputFormat.class);
+    results = InternalVertexRunner.run(conf, new String[0], new String[0]);
+    Assert.assertNotNull(results);
+  }
+
+  /*
+  Test compute method that sends each edge a notification of its parents.
+  The test set only has a 1-1 parent-to-child ratio for this unit test.
+   */
+  public static class EmptyComputation
+    extends BasicComputation<LongWritable, DoubleWritable,
+    FloatWritable, LongWritable> {
+
+    @Override
+    public void compute(
+        Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+        Iterable<LongWritable> messages) throws IOException {
+      Assert.assertNotNull(vertex);
+      vertex.voteToHalt();
+    }
+  }
+}


Mime
View raw message