Return-Path: X-Original-To: apmail-giraph-commits-archive@www.apache.org Delivered-To: apmail-giraph-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ED439101F9 for ; Mon, 18 Nov 2013 15:00:28 +0000 (UTC) Received: (qmail 41989 invoked by uid 500); 18 Nov 2013 15:00:27 -0000 Delivered-To: apmail-giraph-commits-archive@giraph.apache.org Received: (qmail 41963 invoked by uid 500); 18 Nov 2013 15:00:26 -0000 Mailing-List: contact commits-help@giraph.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@giraph.apache.org Delivered-To: mailing list commits@giraph.apache.org Received: (qmail 41866 invoked by uid 99); 18 Nov 2013 15:00:25 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Nov 2013 15:00:25 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9390835405; Mon, 18 Nov 2013 15:00:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: claudio@apache.org To: commits@giraph.apache.org Date: Mon, 18 Nov 2013 15:00:25 -0000 Message-Id: <1512d07f435543f2ab947c2cbf028714@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] git commit: updated refs/heads/trunk to f7c3025 Updated Branches: refs/heads/trunk e987492ee -> f7c302587 GIRAPH-760 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b151d7a9 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b151d7a9 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b151d7a9 Branch: refs/heads/trunk Commit: b151d7a97f8f06451a915668e01f393ad0445b36 Parents: e987492 Author: Claudio Martella Authored: Mon Nov 18 15:59:15 2013 +0100 Committer: Claudio Martella Committed: Mon Nov 18 15:59:15 2013 +0100 ---------------------------------------------------------------------- .../giraph/io/gora/GoraEdgeOutputFormat.java | 281 +++++++++++++++++ .../io/gora/GoraGEdgeEdgeOutputFormat.java | 76 +++++ .../giraph/io/gora/generated/GEdgeResult.java | 314 +++++++++++++++++++ .../io/gora/GoraTestEdgeOutputFormat.java | 119 +++++++ .../io/gora/TestGoraEdgeOutputFormat.java | 93 ++++++ 5 files changed, 883 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/b151d7a9/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeOutputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeOutputFormat.java new file mode 100644 index 0000000..be9f472 --- /dev/null +++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraEdgeOutputFormat.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io.gora; + +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_DATASTORE_CLASS; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_KEY_CLASS; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS; + +import java.io.IOException; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.io.EdgeOutputFormat; +import org.apache.giraph.io.EdgeWriter; +import org.apache.giraph.io.gora.utils.GoraUtils; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.store.DataStore; +import org.apache.gora.util.GoraException; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.log4j.Logger; + +/** + * Class which wraps the GoraInputFormat. It's designed + * as an extension point to EdgeOutputFormat subclasses who wish + * to write to Gora data sources. + * + * Works with + * {@link GoraEdgeInputFormat} + * + * @param edge id type + * @param vertex type + * @param edge type + */ +public abstract class GoraEdgeOutputFormat + extends EdgeOutputFormat { + + /** Logger for Gora's vertex input format. */ + private static final Logger LOG = + Logger.getLogger(GoraEdgeOutputFormat.class); + + /** KeyClass used for getting data. */ + private static Class KEY_CLASS; + + /** The vertex itself will be used as a value inside Gora. */ + private static Class PERSISTENT_CLASS; + + /** Data store class to be used as backend. */ + private static Class DATASTORE_CLASS; + + /** Data store used for querying data. */ + private static DataStore DATA_STORE; + + /** + * checkOutputSpecs + * + * @param context information about the job + * @throws IOException + * @throws InterruptedException + */ + @Override + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + } + + /** + * Gets the data store object initialized. + * @return DataStore created + */ + public DataStore createDataStore() { + DataStore dsCreated = null; + try { + dsCreated = GoraUtils.createSpecificDataStore(getDatastoreClass(), + getKeyClass(), getPersistentClass()); + } catch (GoraException e) { + getLogger().error("Error creating data store."); + e.printStackTrace(); + } + return dsCreated; + } + + @Override + public abstract GoraEdgeWriter + createEdgeWriter(TaskAttemptContext context) + throws IOException, InterruptedException; + + /** + * getOutputCommitter + * + * @param context the task context + * @return OutputCommitter + * @throws IOException + * @throws InterruptedException + */ + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new NullOutputCommitter(); + } + + /** + * Empty output commiter for hadoop. + */ + private static class NullOutputCommitter extends OutputCommitter { + @Override + public void abortTask(TaskAttemptContext arg0) throws IOException { } + + @Override + public void commitTask(TaskAttemptContext arg0) throws IOException { } + + @Override + public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException { + return false; + } + + @Override + public void setupJob(JobContext arg0) throws IOException { } + + @Override + public void setupTask(TaskAttemptContext arg0) throws IOException { } + } + + /** + * Abstract class to be implemented by the user based on their specific + * vertex/edges output. + */ + protected abstract class GoraEdgeWriter extends EdgeWriter { + @Override + public void initialize(TaskAttemptContext context) throws IOException, + InterruptedException { + String sDataStoreType = + GIRAPH_GORA_OUTPUT_DATASTORE_CLASS.get(getConf()); + String sKeyType = + GIRAPH_GORA_OUTPUT_KEY_CLASS.get(getConf()); + String sPersistentType = + GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS.get(getConf()); + try { + Class keyClass = Class.forName(sKeyType); + Class persistentClass = Class.forName(sPersistentType); + Class dataStoreClass = Class.forName(sDataStoreType); + setKeyClass(keyClass); + setPersistentClass((Class) persistentClass); + setDatastoreClass((Class) dataStoreClass); + setDataStore(createDataStore()); + if (getDataStore() != null) { + getLogger().debug("The data store has been created."); + } + } catch (ClassNotFoundException e) { + getLogger().error("Error while reading Gora Output parameters"); + e.printStackTrace(); + } + } + + @Override + public void close(TaskAttemptContext context) + throws IOException, InterruptedException { + getDataStore().flush(); + getDataStore().close(); + } + + @Override + public void writeEdge(I srcId, V srcValue, Edge edge) + throws IOException, InterruptedException { + Persistent goraEdge = null; + Object goraKey = getGoraKey(srcId, srcValue, edge); + goraEdge = getGoraEdge(srcId, srcValue, edge); + getDataStore().put(goraKey, goraEdge); + } + + /** + * Each edge needs to be transformed into a Gora object to be sent to + * a specific data store. + * + * @param edge edge to be transformed into a Gora object + * @param srcId source vertex id + * @param srcValue source vertex value + * @return Gora representation of the vertex + */ + protected abstract Persistent getGoraEdge + (I srcId, V srcValue, Edge edge); + + /** + * Gets the correct key from a computed vertex. + * @param edge edge to extract the key from. + * @param srcId source vertex id + * @param srcValue source vertex value + * @return The key representing such edge. + */ + protected abstract Object getGoraKey(I srcId, V srcValue, Edge edge); + } + + /** + * Gets the data store. + * @return DataStore + */ + public static DataStore getDataStore() { + return DATA_STORE; + } + + /** + * Sets the data store + * @param dStore the dATA_STORE to set + */ + public static void setDataStore(DataStore dStore) { + DATA_STORE = dStore; + } + + /** + * Gets the persistent Class + * @return persistentClass used + */ + static Class getPersistentClass() { + return PERSISTENT_CLASS; + } + + /** + * Sets the persistent Class + * @param persistentClassUsed to be set + */ + static void setPersistentClass + (Class persistentClassUsed) { + PERSISTENT_CLASS = persistentClassUsed; + } + + /** + * Gets the key class used. + * @return the key class used. + */ + static Class getKeyClass() { + return KEY_CLASS; + } + + /** + * Sets the key class used. + * @param keyClassUsed key class used. + */ + static void setKeyClass(Class keyClassUsed) { + KEY_CLASS = keyClassUsed; + } + + /** + * @return Class the DATASTORE_CLASS + */ + public static Class getDatastoreClass() { + return DATASTORE_CLASS; + } + + /** + * @param dataStoreClass the dataStore class to set + */ + public static void setDatastoreClass( + Class dataStoreClass) { + DATASTORE_CLASS = dataStoreClass; + } + + /** + * Gets the logger for the class. + * @return the log of the class. + */ + public static Logger getLogger() { + return LOG; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/b151d7a9/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeOutputFormat.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeOutputFormat.java new file mode 100644 index 0000000..d350d37 --- /dev/null +++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/GoraGEdgeEdgeOutputFormat.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io.gora; + +import java.io.IOException; + +import org.apache.avro.util.Utf8; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.io.gora.generated.GEdgeResult; +import org.apache.gora.persistency.Persistent; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Implementation of a specific writer for a generated data bean. + */ +public class GoraGEdgeEdgeOutputFormat + extends GoraEdgeOutputFormat { + + /** + * Default constructor + */ + public GoraGEdgeEdgeOutputFormat() { + } + + @Override + public GoraEdgeWriter createEdgeWriter( + TaskAttemptContext context) throws IOException, InterruptedException { + return new GoraGEdgeEdgeWriter(); + } + + /** + * Gora edge writer. + */ + protected class GoraGEdgeEdgeWriter + extends GoraEdgeWriter { + + @Override + protected Persistent getGoraEdge(LongWritable srcId, + DoubleWritable srcValue, Edge edge) { + GEdgeResult tmpGEdge = new GEdgeResult(); + tmpGEdge.setEdgeId(new Utf8(srcId.toString())); + tmpGEdge.setEdgeWeight(edge.getValue().get()); + tmpGEdge.setVertexOutId(new Utf8(edge.getTargetVertexId().toString())); + getLogger().debug("GoraObject created: " + tmpGEdge.toString()); + return tmpGEdge; + } + + @Override + protected Object getGoraKey(LongWritable srcId, + DoubleWritable srcValue, Edge edge) { + String goraKey = String.valueOf( + edge.getTargetVertexId().get() + edge.getValue().get()); + return goraKey; + } + + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/b151d7a9/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdgeResult.java ---------------------------------------------------------------------- diff --git a/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdgeResult.java b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdgeResult.java new file mode 100644 index 0000000..0c3501c --- /dev/null +++ b/giraph-gora/src/main/java/org/apache/giraph/io/gora/generated/GEdgeResult.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.io.gora.generated; + +import org.apache.avro.Schema; +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.util.Utf8; +import org.apache.gora.persistency.StateManager; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.persistency.impl.StateManagerImpl; + +/** + * Example Class for defining a Giraph-Edge. + */ +@SuppressWarnings("all") +public class GEdgeResult extends PersistentBase { + /** + * Schema used for the class. + */ + public static final Schema OBJ_SCHEMA = Schema.parse("{\"type\":\"record\"," + + "\"name\":\"GEdge\",\"namespace\":\"org.apache.giraph.gora.generated\"," + + "\"fields\":[{\"name\":\"edgeId\",\"type\":\"string\"}," + + "{\"name\":\"edgeWeight\",\"type\":\"float\"}," + + "{\"name\":\"vertexInId\",\"type\":\"string\"}," + + "{\"name\":\"vertexOutId\",\"type\":\"string\"}," + + "{\"name\":\"label\",\"type\":\"string\"}]}"); + + /** + * Field enum + */ + public static enum Field { + /** + * Edge id. + */ + EDGE_ID(0, "edgeId"), + + /** + * Edge weight. + */ + EDGE_WEIGHT(1, "edgeWeight"), + + /** + * Edge vertex source id. + */ + VERTEX_IN_ID(2, "vertexInId"), + + /** + * Edge vertex end id. + */ + VERTEX_OUT_ID(3, "vertexOutId"), + + /** + * Edge label. + */ + LABEL(4, "label"); + + /** + * Field index + */ + private int index; + + /** + * Field name + */ + private String name; + + /** + * Field constructor + * @param index of attribute + * @param name of attribute + */ + Field(int index, String name) { + this.index = index; + this.name = name; + } + + /** + * Gets index + * @return int of attribute. + */ + public int getIndex() { + return index; + } + + /** + * Gets name + * @return String of name. + */ + public String getName() { + return name; + } + + /** + * Gets name + * @return String of name. + */ + public String toString() { + return name; + } + }; + + /** + * Array containing all fields/ + */ + private static final String[] ALL_FIELDS = { + "edgeId", "edgeWeight", "vertexInId", "vertexOutId", "label" + }; + + static { + PersistentBase.registerFields(GEdgeResult.class, ALL_FIELDS); + } + + /** + * edgeId + */ + private Utf8 edgeId; + + /** + * edgeWeight + */ + private float edgeWeight; + + /** + * vertexInId + */ + private Utf8 vertexInId; + + /** + * vertexOutId + */ + private Utf8 vertexOutId; + + /** + * label + */ + private Utf8 label; + + /** + * Default constructor. + */ + public GEdgeResult() { + this(new StateManagerImpl()); + } + + /** + * Constructor + * @param stateManager from which the object will be created. + */ + public GEdgeResult(StateManager stateManager) { + super(stateManager); + } + + /** + * Creates a new instance + * @param stateManager from which the object will be created. + * @return GEdge created + */ + public GEdgeResult newInstance(StateManager stateManager) { + return new GEdgeResult(stateManager); + } + + /** + * Gets the object schema + * @return Schema of the object. + */ + public Schema getSchema() { + return OBJ_SCHEMA; + } + + /** + * Gets field + * @param fieldIndex index field. + * @return Object from an index. + */ + public Object get(int fieldIndex) { + switch (fieldIndex) { + case 0: + return edgeId; + case 1: + return edgeWeight; + case 2: + return vertexInId; + case 3: + return vertexOutId; + case 4: + return label; + default: + throw new AvroRuntimeException("Bad index"); + } + } + + /** + * Puts a value into a field. + * @param fieldIndex index of field used. + * @param fieldValue value of field used. + */ + @SuppressWarnings(value = "unchecked") + public void put(int fieldIndex, Object fieldValue) { + if (isFieldEqual(fieldIndex, fieldValue)) { + return; + } + getStateManager().setDirty(this, fieldIndex); + switch (fieldIndex) { + case 0: + edgeId = (Utf8) fieldValue; break; + case 1: + edgeWeight = (Float) fieldValue; break; + case 2: + vertexInId = (Utf8) fieldValue; break; + case 3: + vertexOutId = (Utf8) fieldValue; break; + case 4: + label = (Utf8) fieldValue; break; + default: + throw new AvroRuntimeException("Bad index"); + } + } + + /** + * Gets edgeId + * @return Utf8 edgeId + */ + public Utf8 getEdgeId() { + return (Utf8) get(0); + } + + /** + * Sets edgeId + * @param value edgeId + */ + public void setEdgeId(Utf8 value) { + put(0, value); + } + + /** + * Gets edgeWeight + * @return float edgeWeight + */ + public float getEdgeWeight() { + return (Float) get(1); + } + + /** + * Sets edgeWeight + * @param value edgeWeight + */ + public void setEdgeWeight(float value) { + put(1, value); + } + + /** + * Gets edgeVertexInId + * @return Utf8 edgeVertexInId + */ + public Utf8 getVertexInId() { + return (Utf8) get(2); + } + + /** + * Sets edgeVertexInId + * @param value edgeVertexInId + */ + public void setVertexInId(Utf8 value) { + put(2, value); + } + + /** + * Gets edgeVertexOutId + * @return Utf8 edgeVertexOutId + */ + public Utf8 getVertexOutId() { + return (Utf8) get(3); + } + + /** + * Sets edgeVertexOutId + * @param value edgeVertexOutId + */ + public void setVertexOutId(Utf8 value) { + put(3, value); + } + + /** + * Gets edgeLabel + * @return Utf8 edgeLabel + */ + public Utf8 getLabel() { + return (Utf8) get(4); + } + + /** + * Sets edgeLabel + * @param value edgeLabel + */ + public void setLabel(Utf8 value) { + put(4, value); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/b151d7a9/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeOutputFormat.java b/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeOutputFormat.java new file mode 100644 index 0000000..0254498 --- /dev/null +++ b/giraph-gora/src/test/java/org/apache/giraph/io/gora/GoraTestEdgeOutputFormat.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io.gora; + +import java.io.IOException; + +import junit.framework.Assert; + +import org.apache.avro.util.Utf8; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.io.gora.GoraEdgeOutputFormat; +import org.apache.giraph.io.gora.generated.GEdge; +import org.apache.giraph.io.gora.generated.GEdgeResult; +import org.apache.gora.persistency.Persistent; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Implementation of a specific writer for a generated data bean. + */ +public class GoraTestEdgeOutputFormat + extends GoraEdgeOutputFormat { + + /** + * Default constructor + */ + public GoraTestEdgeOutputFormat() { + } + + @Override + public GoraEdgeWriter createEdgeWriter( + TaskAttemptContext context) throws IOException, InterruptedException { + return new GoraGEdgeEdgeWriter(); + } + + /** + * Gora edge writer. + */ + protected class GoraGEdgeEdgeWriter + extends GoraEdgeWriter { + + @Override + protected Persistent getGoraEdge(LongWritable srcId, + DoubleWritable srcValue, Edge edge) { + GEdgeResult tmpGEdge = new GEdgeResult(); + Utf8 keyLabel = new Utf8(srcId.toString() + "-" + + edge.getTargetVertexId().toString()); + tmpGEdge.setEdgeId(keyLabel); + tmpGEdge.setEdgeWeight(edge.getValue().get()); + tmpGEdge.setVertexInId(new Utf8(srcId.toString())); + tmpGEdge.setVertexOutId(new Utf8(edge.getTargetVertexId().toString())); + tmpGEdge.setLabel(keyLabel); + getLogger().debug("GoraObject created: " + tmpGEdge.toString()); + return tmpGEdge; + } + + @Override + protected Object getGoraKey(LongWritable srcId, + DoubleWritable srcValue, Edge edge) { + String goraKey = String.valueOf( + edge.getTargetVertexId().get() + edge.getValue().get()); + return goraKey; + } + + @Override + public void writeEdge(LongWritable srcId, DoubleWritable srcValue, + Edge edge) + throws IOException, InterruptedException { + super.writeEdge(srcId, srcValue, edge); + Object goraKey = getGoraKey(srcId, srcValue, edge); + String keyLabel = String.valueOf(srcId) + "-" + + String.valueOf(edge.getTargetVertexId()); + float weight = Float.valueOf(srcId.toString()) + + Float.valueOf(edge.getTargetVertexId().toString()); + // Asserting + Assert.assertEquals(createEdge(keyLabel, String.valueOf(srcId), + String.valueOf(edge.getTargetVertexId()),keyLabel, weight), + getDataStore().get(goraKey)); + } + + /** + * Creates an edge using an id and a set of edges. + * @param id Vertex id. + * @param vertexInId Vertex source Id. + * @param vertexOutId Vertex destination Id. + * @param edgeLabel Edge label. + * @param edgeWeight Edge wight. + * @return GEdge created. + */ + private GEdge createEdge(String id, String vertexInId, + String vertexOutId, String edgeLabel, float edgeWeight) { + GEdge newEdge = new GEdge(); + newEdge.setEdgeId(new Utf8(id)); + newEdge.setVertexInId(new Utf8(vertexInId)); + newEdge.setVertexOutId(new Utf8(vertexOutId)); + newEdge.setLabel(new Utf8(edgeLabel)); + newEdge.setEdgeWeight(edgeWeight); + return newEdge; + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/b151d7a9/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeOutputFormat.java b/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeOutputFormat.java new file mode 100644 index 0000000..c9ac38a --- /dev/null +++ b/giraph-gora/src/test/java/org/apache/giraph/io/gora/TestGoraEdgeOutputFormat.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.io.gora; + +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_DATASTORE_CLASS; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_END_KEY; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEYS_FACTORY_CLASS; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_KEY_CLASS; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_PERSISTENT_CLASS; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_START_KEY; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_DATASTORE_CLASS; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_KEY_CLASS; +import static org.apache.giraph.io.gora.constants.GiraphGoraConstants.GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS; + +import java.io.IOException; + +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.utils.InternalVertexRunner; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test class for Gora edge output formats. + */ +public class TestGoraEdgeOutputFormat { + + @Test + public void getWritingDb() throws Exception { + Iterable results; + GiraphConfiguration conf = new GiraphConfiguration(); + // Parameters for input + GIRAPH_GORA_DATASTORE_CLASS. + set(conf, "org.apache.gora.memory.store.MemStore"); + GIRAPH_GORA_KEYS_FACTORY_CLASS. + set(conf,"org.apache.giraph.io.gora.utils.DefaultKeyFactory"); + GIRAPH_GORA_KEY_CLASS.set(conf,"java.lang.String"); + GIRAPH_GORA_PERSISTENT_CLASS. + set(conf,"org.apache.giraph.io.gora.generated.GEdge"); + GIRAPH_GORA_START_KEY.set(conf,"1"); + GIRAPH_GORA_END_KEY.set(conf,"4"); + conf.set("io.serializations", + "org.apache.hadoop.io.serializer.WritableSerialization," + + "org.apache.hadoop.io.serializer.JavaSerialization"); + conf.setComputationClass(EmptyComputation.class); + conf.setEdgeInputFormatClass(GoraTestEdgeInputFormat.class); + // Parameters for output + GIRAPH_GORA_OUTPUT_DATASTORE_CLASS. + set(conf, "org.apache.gora.memory.store.MemStore"); + GIRAPH_GORA_OUTPUT_KEY_CLASS.set(conf, "java.lang.String"); + GIRAPH_GORA_OUTPUT_PERSISTENT_CLASS. + set(conf,"org.apache.giraph.io.gora.generated.GEdge"); + conf.setEdgeOutputFormatClass(GoraTestEdgeOutputFormat.class); + results = InternalVertexRunner.run(conf, new String[0], new String[0]); + Assert.assertNotNull(results); + } + + /* + Test compute method that sends each edge a notification of its parents. + The test set only has a 1-1 parent-to-child ratio for this unit test. + */ + public static class EmptyComputation + extends BasicComputation { + + @Override + public void compute( + Vertex vertex, + Iterable messages) throws IOException { + Assert.assertNotNull(vertex); + vertex.voteToHalt(); + } + } +}