giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clau...@apache.org
Subject [3/4] GIRAPH-803
Date Mon, 20 Jan 2014 18:53:41 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/RexsterVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/RexsterVertexOutputFormat.java b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/RexsterVertexOutputFormat.java
new file mode 100644
index 0000000..9af00e0
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/RexsterVertexOutputFormat.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.rexster.io;
+
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_OUTPUT_V_TXSIZE;
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_OUTPUT_WAIT_TIMEOUT;
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_VLABEL;
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_HOSTNAME;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.HttpURLConnection;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.apache.giraph.bsp.BspService;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.giraph.io.VertexWriter;
+import org.apache.giraph.rexster.utils.RexsterUtils;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.giraph.zk.ZooKeeperManager;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Abstract class that users should subclass to use their own Rexster based
+ * vertex onput format.
+ *
+ * @param <I>
+ * @param <V>
+ * @param <E>
+ */
+@SuppressWarnings("rawtypes")
+public class RexsterVertexOutputFormat<I extends WritableComparable,
+  V extends Writable, E extends Writable>
+  extends VertexOutputFormat<I, V, E> {
+
+  /** Class logger. */
+  private static final Logger LOG =
+      Logger.getLogger(RexsterVertexOutputFormat.class);
+
+  @Override
+  public RexsterVertexWriter
+  createVertexWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+
+    return new RexsterVertexWriter();
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException {
+    GiraphConfiguration gconf =
+      new GiraphConfiguration(context.getConfiguration());
+    String msg = "Rexster OutputFormat usage requires both Edge and Vertex " +
+                 "OutputFormat's.";
+
+    if (!gconf.hasEdgeOutputFormat()) {
+      LOG.error(msg);
+      throw new InterruptedException(msg);
+    }
+
+    String endpoint = GIRAPH_REXSTER_HOSTNAME.get(gconf);
+    if (endpoint == null) {
+      throw new InterruptedException(GIRAPH_REXSTER_HOSTNAME.getKey() +
+                                     " is a mandatory parameter.");
+    }
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+
+    return new NullOutputCommitter();
+  }
+
+  /**
+   * Empty output commiter for hadoop.
+   */
+  private static class NullOutputCommitter extends OutputCommitter {
+    @Override
+    public void abortTask(TaskAttemptContext taskContext) { }
+
+    @Override
+    public void cleanupJob(JobContext jobContext) { }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskContext) { }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+      return false;
+    }
+
+    @Override
+    public void setupJob(JobContext jobContext) { }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskContext) { }
+  }
+
+  /**
+   * Abstract class to be implemented by the user based on their specific
+   * vertex/edges output. Easiest to ignore the key value separator and only
+   * use key instead.
+   */
+  protected class RexsterVertexWriter extends VertexWriter<I, V, E>
+    implements Watcher {
+    /** barrier path */
+    private static final String BARRIER_PATH = "/_rexsterBarrier";
+    /** array key that points to the edges and vertices */
+    private static final String JSON_ARRAY_KEY = "tx";
+    /** Connection to the HTTP REST endpoint */
+    private HttpURLConnection rexsterConn;
+    /** Output stream from the HTTP connection to the REST endpoint */
+    private BufferedWriter rexsterBufferedStream;
+    /** attribute used to keep the state of the element array status */
+    private boolean isFirstElement = true;
+    /** ZooKeeper client object */
+    private ZooKeeperExt zk = null;
+    /** lock for management of the barrier */
+    private final Object lock = new Object();
+    /** number of vertices before starting a new connection */
+    private int txsize;
+    /** number of vertexes of vertices sent */
+    private int txcounter = 0;
+    /** label of the vertex id field */
+    private String vlabel;
+    /** vertex id */
+    private I vertexId;
+
+    @Override
+    public void initialize(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      ImmutableClassesGiraphConfiguration conf = getConf();
+
+      vlabel = GIRAPH_REXSTER_VLABEL.get(conf);
+      txsize = GIRAPH_REXSTER_OUTPUT_V_TXSIZE.get(conf);
+      startConnection();
+
+      /* set the barrier */
+      zk = new ZooKeeperExt(conf.getZookeeperList(),
+          conf.getZooKeeperSessionTimeout(), conf.getZookeeperOpsMaxAttempts(),
+          conf.getZookeeperOpsRetryWaitMsecs(), this, context);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+      /* close connection */
+      stopConnection();
+
+      /* deal with the barrier */
+      String id = context.getTaskAttemptID().toString();
+      String zkBasePath = ZooKeeperManager.getBasePath(getConf()) +
+        BspService.BASE_DIR + "/" +
+        getConf().get("mapred.job.id", "Unknown Job");
+      prepareBarrier(zkBasePath);
+      enterBarrier(zkBasePath, id);
+      checkBarrier(zkBasePath, context);
+    }
+
+    @Override
+    public void writeVertex(Vertex<I, V, E> vertex)
+      throws IOException, InterruptedException {
+
+      if (txcounter == txsize) {
+        txcounter = 0;
+        isFirstElement = true;
+        stopConnection();
+        startConnection();
+      }
+
+      try {
+        /* extract the JSON object of the vertex */
+        JSONObject jsonVertex = getVertex(vertex);
+        jsonVertex.accumulate("_type", "vertex");
+        jsonVertex.accumulate(vlabel, getVertexId().toString());
+        String suffix = ",";
+        if (isFirstElement) {
+          isFirstElement = false;
+          suffix = "";
+        }
+        rexsterBufferedStream.write(suffix + jsonVertex);
+        txcounter += 1;
+
+      } catch (JSONException e) {
+        throw new InterruptedException("Error writing the vertex: " +
+                                       e.getMessage());
+      }
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      EventType type = event.getType();
+
+      if (type == EventType.NodeChildrenChanged) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("signal: number of children changed.");
+        }
+        synchronized (lock) {
+          lock.notify();
+        }
+      }
+    }
+
+    /**
+     * Prepare the root node if needed to create the root Rexster barrier znode
+     *
+     * @param  zkBasePath  base path for zookeeper
+     * @throws InterruptedException
+     */
+    private void prepareBarrier(String zkBasePath)
+      throws InterruptedException {
+      try {
+        zk.createExt(zkBasePath + BARRIER_PATH, null, Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT, false);
+      } catch (KeeperException.NodeExistsException nee) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("rexster barrier znode already exists.");
+        }
+      } catch (KeeperException ke) {
+        throw new InterruptedException("RexsterVertexOutputFormat: " +
+            "error while creating the barrier: " + ke.getMessage());
+      }
+    }
+
+    /**
+     * Enter the Rexster barrier
+     *
+     * @param  zkBasePath  base path for zookeeper
+     * @param  id       value id used for the znode
+     * @throws InterruptedException
+     */
+    private void enterBarrier(String zkBasePath, String id)
+      throws InterruptedException {
+      try {
+        zk.createExt(zkBasePath + BARRIER_PATH + "/" + id, null,
+          Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, false);
+      } catch (KeeperException.NodeExistsException nee) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("rexster barrier znode already exists.");
+        }
+      } catch (KeeperException ke) {
+        throw new InterruptedException("RexsterVertexOutputFormat: " +
+            "error while creating the barrier: " + ke.getMessage());
+      }
+    }
+
+    /**
+     * Check the Rexster barrier to verify whether all the vertices have been
+     * saved. If so, the barrier can be left and it is possible to save the
+     * edges.
+     *
+     * @param  zkBasePath  base path for zookeeper
+     * @param  context  task attempt context
+     * @throws InterruptedException
+     */
+    private void checkBarrier(String zkBasePath,
+      TaskAttemptContext context) throws InterruptedException {
+      long workersNum = getConf().getMapTasks() - 1;
+      int timeout = GIRAPH_REXSTER_OUTPUT_WAIT_TIMEOUT.get(getConf());
+
+      try {
+        String barrierPath = zkBasePath + BARRIER_PATH;
+        while (true) {
+          List<String> list =
+            zk.getChildrenExt(barrierPath, true, false, false);
+
+          if (list.size() < workersNum) {
+            synchronized (lock) {
+              lock.wait(timeout);
+            }
+            context.progress();
+          } else {
+            return;
+          }
+        }
+      } catch (KeeperException ke) {
+        throw new InterruptedException("Error while checking the barrier:" +
+                                       ke.getMessage());
+      }
+    }
+
+    /**
+     * Start a new connection with the Rexster REST endpoint.
+     */
+    private void startConnection() throws IOException, InterruptedException {
+      rexsterConn = RexsterUtils.Vertex.openOutputConnection(getConf());
+      rexsterBufferedStream = new BufferedWriter(
+          new OutputStreamWriter(rexsterConn.getOutputStream(),
+                                 Charset.forName("UTF-8")));
+      /* open the JSON container: is an object containing an array of
+         elements */
+      rexsterBufferedStream.write("{ ");
+      rexsterBufferedStream.write("\"vlabel\" : \"" + vlabel + "\",");
+      rexsterBufferedStream.write("\"" + JSON_ARRAY_KEY + "\"");
+      rexsterBufferedStream.write(" : [ ");
+    }
+
+    /**
+     * Stop a new connection with the Rexster REST endpoint. By default the
+     * JDK manages keep-alive so no particular code is sent in place for this
+     * aim.
+     */
+    private void stopConnection() throws IOException, InterruptedException {
+      /* close the JSON container */
+      rexsterBufferedStream.write(" ] }");
+      rexsterBufferedStream.flush();
+      rexsterBufferedStream.close();
+
+      /* check the response and in case of error signal the unsuccessful state
+         via exception */
+      RexsterUtils.Vertex.handleResponse(rexsterConn);
+    }
+
+    /**
+     * Each vertex needs to be transformed into a JSON object to be sent to the
+     * batch interface of Rexster. This function does NOT need to implement any
+     * edge transformation since RexsterVertexWriter#getEdge is
+     * intended for such a task.
+     *
+     * @param  vertex   vertex to be transformed into JSON
+     * @return          JSON representation of the vertex
+     */
+    protected JSONObject getVertex(Vertex<I, V, E> vertex)
+      throws JSONException {
+
+      vertexId = vertex.getId();
+
+      String value = vertex.getValue().toString();
+      JSONObject jsonVertex = new JSONObject();
+      jsonVertex.accumulate("value", value);
+
+      return jsonVertex;
+    }
+
+    /**
+     * For compatibility reasons, the id of the vertex needs to be accumulated
+     * in the vertex object using the defined vlabel, hence we provide a
+     * different function to get the vertex id to keep this compatibility
+     * management indipendent from the user implementation.
+     *
+     * @return vertex id object
+     */
+    protected I getVertexId() {
+      return vertexId;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleDoubleEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleDoubleEdgeOutputFormat.java b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleDoubleEdgeOutputFormat.java
new file mode 100644
index 0000000..80911ff
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleDoubleEdgeOutputFormat.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.rexster.io.formats;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.rexster.io.RexsterEdgeOutputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Rexster Edge Output Format for Long ID's, Double Vertex values and
+ * Double edge values.
+ */
+public class RexsterLongDoubleDoubleEdgeOutputFormat
+  extends RexsterEdgeOutputFormat<LongWritable, DoubleWritable,
+          DoubleWritable> {
+
+  @Override
+  public RexsterEdgeWriter createEdgeWriter(
+      TaskAttemptContext context) throws IOException,
+      InterruptedException {
+
+    return new RexsterLongDoubleDoubleEdgeWriter();
+  }
+
+  /**
+   * Rexster edge writer.
+   */
+  protected class RexsterLongDoubleDoubleEdgeWriter
+    extends RexsterEdgeWriter {
+
+    @Override
+    protected JSONObject getEdge(LongWritable srcId, DoubleWritable srcValue,
+      Edge<LongWritable, DoubleWritable> edge) throws JSONException {
+
+      long outId = srcId.get();
+      long inId = edge.getTargetVertexId().get();
+      double value = edge.getValue().get();
+      JSONObject jsonEdge = new JSONObject();
+      jsonEdge.accumulate("_outV", outId);
+      jsonEdge.accumulate("_inV",  inId);
+      jsonEdge.accumulate("value", value);
+
+      return jsonEdge;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleDoubleVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleDoubleVertexInputFormat.java b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleDoubleVertexInputFormat.java
new file mode 100644
index 0000000..7aa643a
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleDoubleVertexInputFormat.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.rexster.io.formats;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.rexster.io.RexsterVertexInputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Rexster Edge Input Format for Long vertex ID's and Double edge values
+ */
+public class RexsterLongDoubleDoubleVertexInputFormat
+  extends RexsterVertexInputFormat<LongWritable, DoubleWritable,
+          DoubleWritable> {
+
+  @Override
+  public RexsterVertexReader createVertexReader(
+    InputSplit split, TaskAttemptContext context) throws IOException {
+
+    return new RexsterLongDoubleDoubleVertexReader();
+  }
+
+  /**
+   * Rexster vertex reader
+   */
+  protected class RexsterLongDoubleDoubleVertexReader
+    extends RexsterVertexReader {
+
+    @Override
+    protected Vertex<LongWritable, DoubleWritable, DoubleWritable> parseVertex(
+      JSONObject jsonVertex) throws JSONException {
+
+      /* create the actual vertex */
+      Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex =
+        getConf().createVertex();
+
+      Long id;
+      try {
+        id = jsonVertex.getLong("_id");
+      } catch (JSONException ex) {
+        /* OrientDB compatibility; try to transform it as long */
+        String idString = jsonVertex.getString("_id");
+        String[] splits = idString.split(":");
+        id = Long.parseLong(splits[1]);
+      }
+
+      Double value;
+      try {
+        value = jsonVertex.getDouble("value");
+      } catch (JSONException ex) {
+        /* OrientDB compatibility; try to transform it as long */
+        value = new Double(0);
+      }
+      vertex.initialize(new LongWritable(id), new DoubleWritable(value));
+      return vertex;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleDoubleVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleDoubleVertexOutputFormat.java b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleDoubleVertexOutputFormat.java
new file mode 100644
index 0000000..a1a6cc5
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleDoubleVertexOutputFormat.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.rexster.io.formats;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.rexster.io.RexsterVertexOutputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Rexster Vertex Output Format for Long ID's, Double Vertex values and
+ * Double edge values.
+ */
+public class RexsterLongDoubleDoubleVertexOutputFormat
+  extends RexsterVertexOutputFormat<LongWritable, DoubleWritable,
+          DoubleWritable> {
+
+  @Override
+  public RexsterVertexWriter createVertexWriter(
+      TaskAttemptContext context) throws IOException,
+      InterruptedException {
+
+    return new RexsterLongDoubleDoubleVertexWriter();
+  }
+
+  /**
+   * Rexster vertex writer.
+   */
+  protected class RexsterLongDoubleDoubleVertexWriter
+    extends RexsterVertexWriter {
+
+    /** current vertex ID */
+    private LongWritable vertexId;
+
+    @Override
+    protected JSONObject getVertex(
+      Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex)
+      throws JSONException {
+
+      vertexId = vertex.getId();
+
+      double value = vertex.getValue().get();
+      JSONObject jsonVertex = new JSONObject();
+      jsonVertex.accumulate("value", value);
+
+      return jsonVertex;
+    }
+
+    @Override
+    protected LongWritable getVertexId() {
+      return vertexId;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleEdgeInputFormat.java b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleEdgeInputFormat.java
new file mode 100644
index 0000000..9974c28
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleEdgeInputFormat.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.rexster.io.formats;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.rexster.io.RexsterEdgeInputFormat;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Rexster Edge Input Format for Long vertex ID's and Double edge values
+ */
+public class RexsterLongDoubleEdgeInputFormat
+  extends RexsterEdgeInputFormat<LongWritable, DoubleWritable> {
+
+  @Override
+  public RexsterEdgeReader createEdgeReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+
+    return new RexsterLongDoubleEdgeReader();
+  }
+
+  /**
+   * Rexster edge reader
+   */
+  protected class RexsterLongDoubleEdgeReader extends RexsterEdgeReader {
+
+    /** source vertex of the edge */
+    private LongWritable sourceId;
+
+    @Override
+    public LongWritable getCurrentSourceId()
+      throws IOException, InterruptedException {
+
+      return this.sourceId;
+    }
+
+    @Override
+    protected Edge<LongWritable, DoubleWritable> parseEdge(JSONObject jsonEdge)
+      throws JSONException {
+
+      Long value = jsonEdge.getLong("value");
+      Long dest;
+      try {
+        dest = jsonEdge.getLong("_outV");
+      } catch (JSONException ex) {
+        /* OrientDB compatibility; try to transform it as long */
+        String idString = jsonEdge.getString("_outV");
+        String[] splits = idString.split(":");
+        dest = Long.parseLong(splits[1]);
+      }
+      Edge<LongWritable, DoubleWritable> edge =
+        EdgeFactory.create(new LongWritable(dest), new DoubleWritable(value));
+
+      Long sid;
+      try {
+        sid = jsonEdge.getLong("_inV");
+      } catch (JSONException ex) {
+        /* OrientDB compatibility; try to transform it as long */
+        String sidString = jsonEdge.getString("_inV");
+        String[] splits = sidString.split(":");
+        sid = Long.parseLong(splits[1]);
+      }
+      this.sourceId = new LongWritable(sid);
+      return edge;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleFloatEdgeOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleFloatEdgeOutputFormat.java b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleFloatEdgeOutputFormat.java
new file mode 100644
index 0000000..3637837
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleFloatEdgeOutputFormat.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.rexster.io.formats;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.rexster.io.RexsterEdgeOutputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Rexster Edge Output Format for Long ID's, Double Vertex values and
+ * Float edge values.
+ */
+public class RexsterLongDoubleFloatEdgeOutputFormat
+  extends RexsterEdgeOutputFormat<LongWritable, DoubleWritable,
+          FloatWritable> {
+
+  @Override
+  public RexsterEdgeWriter createEdgeWriter(
+      TaskAttemptContext context) throws IOException,
+      InterruptedException {
+
+    return new RexsterLongDoubleFloatEdgeWriter();
+  }
+
+  /**
+   * Rexster edge writer.
+   */
+  protected class RexsterLongDoubleFloatEdgeWriter
+    extends RexsterEdgeWriter {
+
+    @Override
+    protected JSONObject getEdge(LongWritable srcId, DoubleWritable srcValue,
+      Edge<LongWritable, FloatWritable> edge) throws JSONException {
+
+      long outId = srcId.get();
+      long inId = edge.getTargetVertexId().get();
+      float value = edge.getValue().get();
+      JSONObject jsonEdge = new JSONObject();
+      jsonEdge.accumulate("_outV", outId);
+      jsonEdge.accumulate("_inV",  inId);
+      jsonEdge.accumulate("value", value);
+
+      return jsonEdge;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleFloatVertexInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleFloatVertexInputFormat.java b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleFloatVertexInputFormat.java
new file mode 100644
index 0000000..cbd78ce
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleFloatVertexInputFormat.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.rexster.io.formats;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.rexster.io.RexsterVertexInputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Rexster Edge Input Format for Long vertex ID's and Float edge values
+ */
+public class RexsterLongDoubleFloatVertexInputFormat
+  extends RexsterVertexInputFormat<LongWritable, DoubleWritable,
+          FloatWritable> {
+
+  @Override
+  public RexsterVertexReader createVertexReader(
+    InputSplit split, TaskAttemptContext context) throws IOException {
+
+    return new RexsterLongDoubleFloatVertexReader();
+  }
+
+  /**
+   * Rexster vertex reader
+   */
+  protected class RexsterLongDoubleFloatVertexReader
+    extends RexsterVertexReader {
+
+    @Override
+    protected Vertex<LongWritable, DoubleWritable, FloatWritable> parseVertex(
+      JSONObject jsonVertex) throws JSONException {
+
+      /* create the actual vertex */
+      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex =
+        getConf().createVertex();
+
+      Long id;
+      try {
+        id = jsonVertex.getLong("_id");
+      } catch (JSONException ex) {
+        /* OrientDB compatibility; try to transform it as long */
+        String idString = jsonVertex.getString("_id");
+        String[] splits = idString.split(":");
+        id = Long.parseLong(splits[1]);
+      }
+
+      Double value;
+      try {
+        value = jsonVertex.getDouble("value");
+      } catch (JSONException ex) {
+        /* OrientDB compatibility; try to transform it as long */
+        value = new Double(0);
+      }
+
+      vertex.initialize(new LongWritable(id), new DoubleWritable(value));
+      return vertex;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleFloatVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleFloatVertexOutputFormat.java b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleFloatVertexOutputFormat.java
new file mode 100644
index 0000000..2210887
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongDoubleFloatVertexOutputFormat.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.rexster.io.formats;
+
+import java.io.IOException;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.rexster.io.RexsterVertexOutputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Rexster Vertex Output Format for Long ID's, Double Vertex values and
+ * Float edge values.
+ */
+public class RexsterLongDoubleFloatVertexOutputFormat
+  extends RexsterVertexOutputFormat<LongWritable, DoubleWritable,
+          FloatWritable> {
+
+  @Override
+  public RexsterVertexWriter createVertexWriter(
+      TaskAttemptContext context) throws IOException,
+      InterruptedException {
+
+    return new RexsterLongDoubleFloatVertexWriter();
+  }
+
+  /**
+   * Rexster vertex writer.
+   */
+  protected class RexsterLongDoubleFloatVertexWriter
+    extends RexsterVertexWriter {
+
+    /** current vertex ID */
+    private LongWritable vertexId;
+
+    @Override
+    protected JSONObject getVertex(
+      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex)
+      throws JSONException {
+
+      vertexId = vertex.getId();
+
+      double value = vertex.getValue().get();
+      JSONObject jsonVertex = new JSONObject();
+      jsonVertex.accumulate("value", value);
+
+      return jsonVertex;
+    }
+
+    @Override
+    protected LongWritable getVertexId() {
+      return vertexId;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongFloatEdgeInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongFloatEdgeInputFormat.java b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongFloatEdgeInputFormat.java
new file mode 100644
index 0000000..3b13922
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/RexsterLongFloatEdgeInputFormat.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.rexster.io.formats;
+
+import java.io.IOException;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.rexster.io.RexsterEdgeInputFormat;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Rexster Edge Input Format for Long vertex ID's and Float edge values
+ */
+public class RexsterLongFloatEdgeInputFormat
+  extends RexsterEdgeInputFormat<LongWritable, FloatWritable> {
+
+  @Override
+  public RexsterEdgeReader createEdgeReader(
+      InputSplit split, TaskAttemptContext context) throws IOException {
+
+    return new RexsterLongFloatEdgeReader();
+  }
+
+  /**
+   * Rexster edge reader
+   */
+  protected class RexsterLongFloatEdgeReader extends RexsterEdgeReader {
+
+    /** source vertex of the edge */
+    private LongWritable sourceId;
+
+    @Override
+    public LongWritable getCurrentSourceId()
+      throws IOException, InterruptedException {
+
+      return this.sourceId;
+    }
+
+    @Override
+    protected Edge<LongWritable, FloatWritable> parseEdge(JSONObject jsonEdge)
+      throws JSONException {
+
+      Long value = jsonEdge.getLong("value");
+      Long dest;
+      try {
+        dest = jsonEdge.getLong("_outV");
+      } catch (JSONException ex) {
+        /* OrientDB compatibility; try to transform it as long */
+        String idString = jsonEdge.getString("_outV");
+        String[] splits = idString.split(":");
+        dest = Long.parseLong(splits[1]);
+      }
+      Edge<LongWritable, FloatWritable> edge =
+        EdgeFactory.create(new LongWritable(dest), new FloatWritable(value));
+
+      Long sid;
+      try {
+        sid = jsonEdge.getLong("_inV");
+      } catch (JSONException ex) {
+        /* OrientDB compatibility; try to transform it as long */
+        String sidString = jsonEdge.getString("_inV");
+        String[] splits = sidString.split(":");
+        sid = Long.parseLong(splits[1]);
+      }
+      this.sourceId = new LongWritable(sid);
+      return edge;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/package-info.java b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/package-info.java
new file mode 100644
index 0000000..b5ae44f
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/formats/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of reusable Input/Output formats for Rexster in Giraph.
+ */
+package org.apache.giraph.rexster.io.formats;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/package-info.java b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/package-info.java
new file mode 100644
index 0000000..bbd5a7f
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/io/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of reusable Input/Output formats for Rexster in Giraph.
+ */
+package org.apache.giraph.rexster.io;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/utils/RexsterUtils.java
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/utils/RexsterUtils.java b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/utils/RexsterUtils.java
new file mode 100644
index 0000000..a3d6a96
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/utils/RexsterUtils.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.rexster.utils;
+
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_GREMLIN_E_SCRIPT;
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_GREMLIN_V_SCRIPT;
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_HOSTNAME;
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_INPUT_GRAPH;
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_OUTPUT_GRAPH;
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_PASSWORD;
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_PORT;
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_USERNAME;
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_USES_SSL;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.rexster.io.RexsterInputSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.log4j.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+
+/**
+ * Utility functions for the Rexster REST interface
+ */
+@SuppressWarnings("rawtypes")
+public abstract class RexsterUtils {
+  /** start object symbol for JSON */
+  public static final char KEY_VALUE_SEPARATOR = ':';
+  /** start object symbol for JSON */
+  public static final char START_OBJECT = '{';
+  /** end object symbol for JSON */
+  public static final char END_OBJECT = '}';
+  /** start array symbol for JSON */
+  public static final char START_ARRAY = '[';
+  /** end array symbol for JSON */
+  public static final char END_ARRAY = ']';
+  /** array elements separator symbol for JSON */
+  public static final char ARRAY_SEPARATOR = ',';
+  /** Class logger. */
+  private static final Logger LOG = Logger.getLogger(RexsterUtils.class);
+
+  /**
+   * The default constructor is set to be private by default so that the
+   * class is not instantiated.
+   */
+  private RexsterUtils() { /* private constructor */ }
+
+  /**
+   * Parse all the vertices from the JSON retreived from Rexster. Inspired
+   * by the implementation of the JSONObject class.
+   *
+   * @param  br           buffer over the HTTP response content
+   * @return JSONTokener  tokener over the HTTP JSON. Null in case the results
+   *                      array is empty.
+   */
+  public static JSONTokener parseJSONEnvelope(BufferedReader br)
+    throws InterruptedException {
+
+    JSONTokener tokener = null;
+
+    try {
+      tokener = new JSONTokener(br);
+      /* check that the JSON is well-formed by starting with a '{' */
+      if (tokener.nextClean() != START_OBJECT) {
+        LOG.error(String.format("A JSONObject text must begin with '%c'",
+                  START_OBJECT));
+      }
+
+      /* loop on the whole array */
+      char c = '\0';
+      String key = null;
+      for (;;) {
+        c = tokener.nextClean();
+        switch (c) {
+        case 0:
+          LOG.error(String.format("A JSONObject text must end with '%c'",
+                    END_OBJECT));
+          break;
+        case END_OBJECT:
+          return tokener;
+        default:
+          tokener.back();
+          key = tokener.nextValue().toString();
+        }
+
+        c = tokener.nextClean();
+
+        if (c != KEY_VALUE_SEPARATOR) {
+          LOG.error(String.format("Expected a %c after a key", c));
+        }
+
+        if (key != null && !key.equals("results")) {
+          tokener.nextValue();
+        } else {
+          /* starting array */
+          c = tokener.nextClean();
+          if (c != START_ARRAY) {
+            LOG.error("'results' is expected to be an array");
+          }
+
+          /* check if the array is emty. If so, return null to signal that
+             no objects are available in the array, otherwise return the
+             tokener. */
+          c = tokener.nextClean();
+          if (c == END_ARRAY) {
+            return null;
+          } else {
+            tokener.back();
+            return tokener;
+          }
+        }
+
+        switch (tokener.nextClean()) {
+        case ';':
+        case ',':
+          if (tokener.nextClean() == '}') {
+            return tokener;
+          }
+          tokener.back();
+          break;
+        case '}':
+          return tokener;
+        default:
+          LOG.error("Expected a ',' or '}'");
+        }
+      }
+
+    } catch (JSONException e) {
+      LOG.error("Unable to parse the JSON with the vertices.\n" +
+                e.getMessage());
+      throw new InterruptedException(e.toString());
+    }
+  }
+
+  /**
+   * Splitter used by both Vertex and Edge Input Format.
+   *
+   * @param  context     The job context
+   * @param  estimation  Number of estimated objects
+   * @return splits to be generated to read the input
+   */
+  public static List<InputSplit> getSplits(JobContext context,
+    long estimation) throws IOException, InterruptedException {
+
+    int chunks = context.getConfiguration().getInt("mapred.map.tasks", 1);
+    long chunkSize = estimation / chunks;
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(String.format("Estimated objects: %d", estimation));
+      LOG.debug(String.format("Number of chunks: %d", chunks));
+    }
+
+    for (int i = 0; i < chunks; ++i) {
+      long start = i * chunkSize;
+      long end =
+        ((i + 1) == chunks) ? Long.MAX_VALUE : (i * chunkSize) + chunkSize;
+      RexsterInputSplit split = new RexsterInputSplit(start, end);
+      splits.add(split);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Chunk: start %d; end %d;", start, end));
+        LOG.debug(String.format("Chunk: size %d;", chunkSize));
+        LOG.debug(split);
+      }
+    }
+
+    return splits;
+  }
+
+  /**
+   * Opens an HTTP connection to the specified Rexster server.
+   *
+   * @param   conf            giraph configuration
+   * @param   start           start index of the Rexster page split
+   * @param   end             end index of the Rexster page split
+   * @param   type            stream type (vertices or edges) needed for the
+   *                          REST Url
+   * @param   gremlinScript   gremlin script. If set to null, will be ignored.
+   * @return  BufferedReader  the object used to retrieve the HTTP response
+   *                          content
+   */
+  // CHECKSTYLE: stop IllegalCatch
+  public static BufferedReader openInputStream(
+    ImmutableClassesGiraphConfiguration conf,
+    long start, long end, String type, String gremlinScript)
+    throws InterruptedException {
+
+    String uriScriptFormat =
+      "/graphs/%s/tp/gremlin?script=%s" +
+      "&rexster.offset.start=%s&rexster.offset.end=%s";
+    String uriFormat =
+      "/graphs/%s/tp/giraph/%s/" +
+      "?rexster.offset.start=%s&rexster.offset.end=%s";
+    String endpoint = GIRAPH_REXSTER_HOSTNAME.get(conf);
+
+    try {
+      boolean isSsl = GIRAPH_REXSTER_USES_SSL.get(conf);
+      int port = GIRAPH_REXSTER_PORT.get(conf);
+      String graph = GIRAPH_REXSTER_INPUT_GRAPH.get(conf);
+      URL url;
+      if (gremlinScript != null && !gremlinScript.isEmpty()) {
+        url = new URL(isSsl ? "https" : "http", endpoint, port,
+                      String.format(uriScriptFormat, graph, gremlinScript,
+                                    start, end));
+      } else {
+        url = new URL(isSsl ? "https" : "http", endpoint, port,
+                      String.format(uriFormat, graph, type, start, end));
+      }
+
+      LOG.info(url);
+
+      String username = GIRAPH_REXSTER_USERNAME.get(conf);
+      String password = GIRAPH_REXSTER_PASSWORD.get(conf);
+      String auth = getHTTPAuthString(username, password);
+
+      HttpURLConnection connection = createConnection(url, "GET", auth);
+      connection.setRequestProperty("Content-Type",
+        "application/json; charset=UTF-8");
+      connection.setDoInput(true);
+      connection.setDoOutput(false);
+      RexsterUtils.handleResponse(connection, type);
+
+      InputStream is  = connection.getInputStream();
+      InputStreamReader isr = new InputStreamReader(is,
+          Charset.forName("UTF-8"));
+      return new BufferedReader(isr);
+    } catch (Exception e) {
+      throw new InterruptedException(e.getMessage());
+    }
+  }
+  // CHECKSTYLE: resume IllegalCatch
+
+  /**
+   * Opens an HTTP connection to the specified Rexster server.
+   *
+   * @param conf giraph configuration
+   * @param type either edge or vertex
+   * @return  the object used to populate the HTTP response content
+   */
+  // CHECKSTYLE: stop IllegalCatch
+  public static HttpURLConnection openOutputConnection(
+    ImmutableClassesGiraphConfiguration conf, String type)
+    throws InterruptedException {
+
+    String uriFormat = "/graphs/%s/tp/giraph/%s/";
+    String endpoint = GIRAPH_REXSTER_HOSTNAME.get(conf);
+    boolean isSsl = GIRAPH_REXSTER_USES_SSL.get(conf);
+    int port = GIRAPH_REXSTER_PORT.get(conf);
+    String graph = GIRAPH_REXSTER_OUTPUT_GRAPH.get(conf);
+
+    try {
+      URL url = new URL(isSsl ? "https" : "http", endpoint, port,
+                        String.format(uriFormat, graph, type));
+      LOG.info(url);
+
+      String username = GIRAPH_REXSTER_USERNAME.get(conf);
+      String password = GIRAPH_REXSTER_PASSWORD.get(conf);
+      String auth = getHTTPAuthString(username, password);
+
+      HttpURLConnection connection = createConnection(url, "POST", auth);
+      connection.setRequestProperty("Content-Type",
+                                    "application/json; cherset=UTF-8");
+      connection.setDoInput(true);
+      connection.setDoOutput(true);
+      return connection;
+    } catch (Exception e) {
+      throw new InterruptedException(e.getMessage());
+    }
+  }
+  // CHECKSTYLE: resume IllegalCatch
+
+  /**
+   * Creates a new HTTP connection to the specified server.
+   *
+   * @param   url         URI to connec to
+   * @param   method      method used for the HTTP request
+   * @param   authValue   authetication value if available
+   * @return  a new HTTP connection
+   */
+  private static HttpURLConnection createConnection(final URL url,
+    final String method, final String authValue) throws Exception {
+
+    final HttpURLConnection connection =
+      (HttpURLConnection) url.openConnection();
+
+    connection.setConnectTimeout(0);
+    connection.setReadTimeout(0);
+    connection.setRequestMethod(method);
+    if (authValue != null) {
+      connection.setRequestProperty("Authorization", authValue);
+    }
+
+    return connection;
+  }
+
+  /**
+   * Utility to handle the output response in case of errors.
+   *
+   * @param conn connection to the Rexster Interface
+   * @param type type of data saved (vertices or edges)
+   */
+  private static void handleResponse(HttpURLConnection conn, String type)
+    throws IOException, InterruptedException {
+
+    if (conn.getResponseCode() != 200) {
+      InputStream is = conn.getErrorStream();
+      BufferedReader rd =
+        new BufferedReader(new InputStreamReader(is, Charset.forName("UTF-8")));
+
+      JSONObject obj = new JSONObject(rd);
+      StringBuffer sb = new StringBuffer("Error occured while saving " +
+                                         type + ";");
+      String aux;
+      while ((aux = rd.readLine()) != null) {
+        sb.append(aux);
+      }
+      sb.append(obj);
+
+      /*
+      try {
+        LOG.info("--> " + obj);
+        String message = obj.getString("message");
+        sb.append(" ");
+        sb.append(message);
+      } catch (JSONException e) {
+        LOG.error("Unable to extract the error message.");
+      }
+      */
+      rd.close();
+
+      throw new InterruptedException(sb.toString());
+    }
+  }
+
+  /**
+   * Specific Rexster utility functions for vertices
+   */
+  public static class Vertex {
+    /**
+     * Empty private constructor. This class should not be instantiated.
+     */
+    private Vertex() { /* private constructor */ }
+
+    /**
+     * Opens an HTTP connection to the specified Rexster server for vertices.
+     *
+     * @param   conf            giraph configuration
+     * @param   start           start index of the Rexster page split
+     * @param   end             end index of the Rexster page split
+     * @return  BufferedReader  the object used to retrieve the HTTP response
+     */
+    public static BufferedReader openInputStream(
+      ImmutableClassesGiraphConfiguration conf, long start, long end)
+      throws InterruptedException {
+
+      String gremlinScript = GIRAPH_REXSTER_GREMLIN_V_SCRIPT.get(conf);
+      return RexsterUtils.openInputStream(conf, start, end, "vertices",
+                                          gremlinScript);
+    }
+
+    /**
+     * Opens an HTTP connection to the specified Rexster server for vertices.
+     *
+     * @param conf giraph configuration
+     * @return  the object used to populate the HTTP response content
+     */
+    public static HttpURLConnection openOutputConnection(
+      ImmutableClassesGiraphConfiguration conf)
+      throws InterruptedException {
+
+      return RexsterUtils.openOutputConnection(conf, "vertices");
+    }
+
+    /**
+     * Utility to handle the output response in case of errors.
+     *
+     * @param conn connection to the Rexster Interface
+     */
+    public static void handleResponse(HttpURLConnection conn)
+      throws IOException, InterruptedException {
+
+      RexsterUtils.handleResponse(conn, "vertices");
+    }
+  }
+
+  /**
+   * Specific Rexster utility functions for edges
+   */
+  public static class Edge {
+    /**
+     * Empty private constructor. This class should not be instantiated.
+     */
+    private Edge() { /* private constructor */ }
+
+    /**
+     * Opens an HTTP connection to the specified Rexster server for edges.
+     *
+     * @param   conf            giraph configuration
+     * @param   start           start index of the Rexster page split
+     * @param   end             end index of the Rexster page split
+     * @return  BufferedReader  the object used to retrieve the HTTP response
+     */
+    public static BufferedReader openInputStream(
+      ImmutableClassesGiraphConfiguration conf, long start, long end)
+      throws InterruptedException {
+
+      String gremlinScript = GIRAPH_REXSTER_GREMLIN_E_SCRIPT.get(conf);
+      return RexsterUtils.openInputStream(conf, start, end, "edges",
+                                          gremlinScript);
+    }
+
+    /**
+     * Opens an HTTP connection to the specified Rexster server for edges.
+     *
+     * @param conf giraph configuration
+     * @return  the object used to populate the HTTP response content
+     */
+    public static HttpURLConnection openOutputConnection(
+      ImmutableClassesGiraphConfiguration conf)
+      throws InterruptedException {
+
+      return RexsterUtils.openOutputConnection(conf, "edges");
+    }
+
+    /**
+     * Utility to handle the output response in case of errors.
+     *
+     * @param conn connection to the Rexster Interface
+     */
+    public static void handleResponse(HttpURLConnection conn)
+      throws IOException, InterruptedException {
+
+      RexsterUtils.handleResponse(conn, "edges");
+    }
+  }
+
+  /**
+   * Provide the Authentication string used for the HTTP connection with
+   * Rexster.
+   *
+   * @param  username   username to connect to HTTP
+   * @param  password   password to connect to HTTP
+   * @return String     the authentication string
+   */
+  private static String getHTTPAuthString(String username,
+    String password) {
+
+    if (username.isEmpty()) {
+      return null;
+    } else {
+      return "Basic " +
+             Base64.encodeBase64URLSafeString(
+               (username + ":" + password).getBytes(Charset.forName("UTF-8")));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/utils/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/utils/package-info.java b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/utils/package-info.java
new file mode 100644
index 0000000..3f6810f
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/main/java/org/apache/giraph/rexster/utils/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package of reusable utils for Rexster in Giraph.
+ */
+package org.apache.giraph.rexster.utils;

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/test/java/org/apache/giraph/rexster/io/formats/TestRexsterLongDoubleFloatIOFormat.java
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/test/java/org/apache/giraph/rexster/io/formats/TestRexsterLongDoubleFloatIOFormat.java b/giraph-rexster/giraph-rexster-io/src/test/java/org/apache/giraph/rexster/io/formats/TestRexsterLongDoubleFloatIOFormat.java
new file mode 100644
index 0000000..5008608
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/test/java/org/apache/giraph/rexster/io/formats/TestRexsterLongDoubleFloatIOFormat.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.rexster.io.formats;
+
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_GREMLIN_E_SCRIPT;
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_GREMLIN_V_SCRIPT;
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_HOSTNAME;
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_INPUT_GRAPH;
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_OUTPUT_GRAPH;
+import static org.apache.giraph.rexster.conf.GiraphRexsterConstants.GIRAPH_REXSTER_PORT;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.configuration.HierarchicalConfiguration;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.io.FileUtils;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat;
+import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.tinkerpop.rexster.Tokens;
+import com.tinkerpop.rexster.protocol.EngineController;
+import com.tinkerpop.rexster.server.HttpRexsterServer;
+import com.tinkerpop.rexster.server.RexsterApplication;
+import com.tinkerpop.rexster.server.RexsterServer;
+import com.tinkerpop.rexster.server.XmlRexsterApplication;
+
+/**
+ * This test suit is intended to extensively test Rexster I/O Format
+ * together with the Kibble for such a goal.
+ */
+public class TestRexsterLongDoubleFloatIOFormat {
+  /** temporary directory */
+  protected static final String TMP_DIR = "/tmp/";
+  /** input JSON extension */
+  protected static final String REXSTER_CONF = "rexster.xml";
+  /** string databases */
+  protected static final String DATABASES[] = { "tgdb", "neodb", "orientdb" };
+  /** string database (empty one) */
+  protected static final String EMPTYDB = "emptydb";
+  /** Rexster server instance */
+  protected static RexsterServer server;
+
+  @BeforeClass
+  public static void initialSetup() throws Exception {
+    startRexsterServer();
+    insertDbData();
+  }
+
+  @AfterClass
+  static public void finalTearDown() throws Exception {
+    stopRexsterServer();
+    deleteDbs();
+  }
+
+  @Test
+  public void testEmptyDbInput() throws Exception {
+    testDbInput(EMPTYDB, true, false);
+  }
+
+  @Ignore("Fails due to maven dependecy conflicts.")
+  @Test
+  public void testEmptyDbInputGremlin() throws Exception {
+    testDbInput(EMPTYDB, true, true);
+  }
+
+  @Test
+  public void testTgDbInput() throws Exception {
+    testDbInput(DATABASES[0], false, false);
+  }
+
+  @Ignore("Fails due to maven dependecy conflicts.")
+  @Test
+  public void testTgDbInputGremlin() throws Exception {
+    testDbInput(DATABASES[0], false, true);
+  }
+
+  @Test
+  public void testNeoDbInput() throws Exception {
+    testDbInput(DATABASES[1], false, false);
+  }
+
+  @Ignore("Fails due to maven dependecy conflicts.")
+  @Test
+  public void testNeoDbInputGremlin() throws Exception {
+    testDbInput(DATABASES[1], false, true);
+  }
+
+  @Test
+  public void testOrientDbInput() throws Exception {
+    testDbInput(DATABASES[2], false, false);
+  }
+
+  @Ignore("Fails due to maven dependecy conflicts.")
+  @Test
+  public void testOrientDbInputGremlin() throws Exception {
+    testDbInput(DATABASES[2], false, true);
+  }
+
+  @Test
+  public void testTgDbOutput() throws Exception {
+    testDbOutput("empty" + DATABASES[0]);
+  }
+
+  @Test
+  public void testNeoDbOutput() throws Exception {
+    testDbOutput("empty" + DATABASES[1]);
+  }
+
+  @Test
+  public void testOrientDbOutput() throws Exception {
+    testDbOutput("empty" + DATABASES[2]);
+  }
+
+  private void testDbInput(String name, boolean isEmpty, boolean isGramlin)
+    throws Exception {
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    GIRAPH_REXSTER_HOSTNAME.set(conf, "127.0.0.1");
+    GIRAPH_REXSTER_PORT.set(conf, 18182);
+    GIRAPH_REXSTER_INPUT_GRAPH.set(conf, name);
+    if (isGramlin) {
+      GIRAPH_REXSTER_GREMLIN_V_SCRIPT.set(conf, "g.V");
+      GIRAPH_REXSTER_GREMLIN_E_SCRIPT.set(conf, "g.E");
+    }
+    conf.setComputationClass(EmptyComputation.class);
+    conf.setVertexInputFormatClass(
+        RexsterLongDoubleFloatVertexInputFormat.class);
+    conf.setEdgeInputFormatClass(RexsterLongFloatEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(
+        JsonLongDoubleFloatDoubleVertexOutputFormat.class);
+
+    Iterable<String> results = InternalVertexRunner.run(conf, new String[0],
+      new String[0]);
+    if (isEmpty) {
+      boolean empty = false;
+      if (results != null) {
+        Iterator<String> it = results.iterator();
+        empty = !it.hasNext();
+      } else {
+        empty = true;
+      }
+      assert empty;
+      return;
+    } else {
+      assert results != null;
+    }
+
+    URL url = this.getClass().getResource(name + "-output.json");
+    File file = new File(url.toURI());
+    ArrayList<Element> expected =
+      convertIterator(Files.readLines(file,Charsets.UTF_8).iterator());
+    ArrayList<Element> result = convertIterator(results.iterator());
+    checkResult(expected, result);
+  }
+
+  private void testDbOutput(String name) throws Exception {
+    GiraphConfiguration conf = new GiraphConfiguration();
+    GIRAPH_REXSTER_HOSTNAME.set(conf, "127.0.0.1");
+    GIRAPH_REXSTER_PORT.set(conf, 18182);
+    GIRAPH_REXSTER_OUTPUT_GRAPH.set(conf, name);
+    conf.setComputationClass(EmptyComputation.class);
+    conf.setVertexInputFormatClass(
+      JsonLongDoubleFloatDoubleVertexInputFormat.class);
+    conf.setVertexOutputFormatClass(
+      RexsterLongDoubleFloatVertexOutputFormat.class);
+    conf.setEdgeOutputFormatClass(
+      RexsterLongDoubleFloatEdgeOutputFormat.class);
+
+    /* graph used for testing */
+    String[] graph = new String[] {
+      "[1,0,[[2,1],[4,3]]]",
+      "[2,0,[[1,1],[3,2],[4,1]]]",
+      "[3,0,[[2,2]]]",
+      "[4,0,[[1,3],[5,4],[2,1]]]",
+      "[5,0,[[3,4],[4,4]]]"
+    };
+
+    InternalVertexRunner.run(conf, graph);
+    URL url = this.getClass().getResource(name + "-output.json");
+    File file = new File(url.toURI());
+    ArrayList<Element> expected =
+      convertIterator(Files.readLines(file,Charsets.UTF_8).iterator());
+    ArrayList<Element> result = getRexsterContent(name);
+    checkResult(expected, result);
+  }
+
+  /**
+   * Test compute method that sends each edge a notification of its parents.
+   * The test set only has a 1-1 parent-to-child ratio for this unit test.
+   */
+  public static class EmptyComputation
+    extends BasicComputation<LongWritable, DoubleWritable,
+              FloatWritable, LongWritable> {
+    @Override
+    public void compute(
+      Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+      Iterable<LongWritable> messages) throws IOException {
+
+      vertex.voteToHalt();
+    }
+  }
+
+  /**
+   * Start the Rexster server by preparing the configuration file loaded via
+   * the resources and setting other important parameters.
+   */
+  @SuppressWarnings("unchecked")
+  private static void startRexsterServer() throws Exception {
+    InputStream rexsterConf =
+      TestRexsterLongDoubleFloatIOFormat.class.getResourceAsStream(
+        REXSTER_CONF);
+    XMLConfiguration properties = new XMLConfiguration();
+    properties.load(rexsterConf);
+    rexsterConf.close();
+
+    List<HierarchicalConfiguration> graphConfigs =
+      properties.configurationsAt(Tokens.REXSTER_GRAPH_PATH);
+    RexsterApplication application  = new XmlRexsterApplication(graphConfigs);
+    server = new HttpRexsterServer(properties);
+
+    int scriptEngineThreshold =
+      properties.getInt("script-engine-reset-threshold",
+                        EngineController.RESET_NEVER);
+    String scriptEngineInitFile =
+      properties.getString("script-engine-init", "");
+
+    /* allow scriptengines to be configured so that folks can drop in
+       different gremlin flavors. */
+    List<String> scriptEngineNames = properties.getList("script-engines");
+
+    if (scriptEngineNames == null) {
+      /* configure to default with gremlin-groovy */
+      EngineController.configure(scriptEngineThreshold, scriptEngineInitFile);
+    } else {
+      EngineController.configure(scriptEngineThreshold, scriptEngineInitFile,
+                                 new HashSet<String>(scriptEngineNames));
+    }
+    server.start(application);
+  }
+
+  private static void stopRexsterServer() throws Exception {
+    try {
+      server.stop();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void deleteDbs() throws Exception {
+    for (int i = 0; i < DATABASES.length; ++i) {
+      FileUtils.deleteDirectory(new File(TMP_DIR + DATABASES[i]));
+      FileUtils.deleteDirectory(new File(TMP_DIR + "empty" + DATABASES[i]));
+    }
+    FileUtils.deleteDirectory(new File(TMP_DIR + EMPTYDB));
+  }
+
+  private static void insertDbData() throws Exception {
+    for (int i = 0; i < DATABASES.length; ++i) {
+      URL obj = new URL("http://127.0.0.1:18182/graphs/" + DATABASES[i] +
+        "/tp/giraph/vertices");
+      HttpURLConnection conn = (HttpURLConnection) obj.openConnection();
+      conn.setRequestMethod("POST");
+      conn.setRequestProperty("Accept", "*/*");
+      conn.setRequestProperty("Content-Type",
+          "application/json; charset=UTF-8");
+      conn.setDoOutput(true);
+      DataOutputStream wr = new DataOutputStream(conn.getOutputStream());
+      /* write the JSON to be sent */
+      wr.writeBytes("{ \"vlabel\":\"_vid\", \"tx\":[ ");
+      wr.writeBytes("{ \"value\":0,\"_vid\":0 },");
+      wr.writeBytes("{ \"value\":0,\"_vid\":1 },");
+      wr.writeBytes("{ \"value\":0,\"_vid\":2 },");
+      wr.writeBytes("{ \"value\":0,\"_vid\":3 },");
+      wr.writeBytes("{ \"value\":0,\"_vid\":4 }");
+      wr.writeBytes(" ] }");
+      int responseCode = conn.getResponseCode();
+      if (responseCode != 200) {
+        throw new RuntimeException("Unable to insert data in " + DATABASES[i] +
+          " code: " + responseCode );
+      }
+      BufferedReader in = new BufferedReader(
+          new InputStreamReader(conn.getInputStream()));
+      String inputLine;
+      StringBuffer response = new StringBuffer();
+      while ((inputLine = in.readLine()) != null) {
+        response.append(inputLine);
+      }
+      in.close();
+
+      obj = new URL("http://127.0.0.1:18182/graphs/" + DATABASES[i] +
+        "/tp/giraph/edges");
+      conn = (HttpURLConnection) obj.openConnection();
+      conn.setRequestMethod("POST");
+      conn.setRequestProperty("Accept", "*/*");
+      conn.setRequestProperty("Content-Type",
+          "application/json; charset=UTF-8");
+      conn.setDoOutput(true);
+      wr = new DataOutputStream(conn.getOutputStream());
+      /* write the JSON to be sent */
+      wr.writeBytes("{ \"vlabel\":\"_vid\", \"tx\":[ ");
+      wr.writeBytes("{ \"value\": 1, \"_outV\": 0, \"_inV\": 1 },");
+      wr.writeBytes("{ \"value\": 3, \"_outV\": 0, \"_inV\": 3 },");
+      wr.writeBytes("{ \"value\": 1, \"_outV\": 1, \"_inV\": 0 },");
+      wr.writeBytes("{ \"value\": 2, \"_outV\": 1, \"_inV\": 2 },");
+      wr.writeBytes("{ \"value\": 1, \"_outV\": 1, \"_inV\": 3 },");
+      wr.writeBytes("{ \"value\": 5, \"_outV\": 2, \"_inV\": 1 },");
+      wr.writeBytes("{ \"value\": 4, \"_outV\": 2, \"_inV\": 4 },");
+      wr.writeBytes("{ \"value\": 3, \"_outV\": 3, \"_inV\": 0 },");
+      wr.writeBytes("{ \"value\": 1, \"_outV\": 3, \"_inV\": 1 },");
+      wr.writeBytes("{ \"value\": 4, \"_outV\": 3, \"_inV\": 4 },");
+      wr.writeBytes("{ \"value\": 4, \"_outV\": 4, \"_inV\": 3 },");
+      wr.writeBytes("{ \"value\": 4, \"_outV\": 4, \"_inV\": 2 }");
+      wr.writeBytes(" ] }");
+      wr.flush();
+      wr.close();
+      responseCode = conn.getResponseCode();
+      if (responseCode != 200) {
+        throw new RuntimeException("Unable to insert data in " + DATABASES[i] +
+          " code: " + responseCode );
+      }
+      in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
+      response = new StringBuffer();
+      while ((inputLine = in.readLine()) != null) {
+        response.append(inputLine);
+      }
+      in.close();
+    }
+  }
+
+  private ArrayList<Element> convertIterator(Iterator<String> elementit)
+    throws JSONException {
+    ArrayList<Element> result = new ArrayList<Element>();
+    while(elementit.hasNext()) {
+      JSONArray vertex = new JSONArray(elementit.next());
+      Element element = new Element(vertex.getLong(0), vertex.getLong(1));
+      JSONArray edges = vertex.getJSONArray(2);
+      for (int i = 0; i < edges.length(); ++i) {
+        element.add(edges.getJSONArray(i).toString());
+      }
+      result.add(element);
+    }
+    return result;
+  }
+
+  private ArrayList<Element> getRexsterContent(String name) throws Exception {
+    ArrayList<Element> result = new ArrayList<Element>();
+    /* get all the vertices */
+    URL url = new URL("http://127.0.0.1:18182/graphs/" + name +
+      "/tp/giraph/vertices");
+    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+    conn.setRequestMethod("GET");
+    conn.setRequestProperty("Content-Type", "application/json; charset=UTF-8");
+
+    InputStream is = conn.getInputStream();
+    StringBuffer json = new StringBuffer();
+    BufferedReader br = new BufferedReader(new InputStreamReader(is));
+    while (br.ready()) {
+      json.append(br.readLine());
+    }
+    br.close();
+    is.close();
+
+    JSONObject results = new JSONObject(json.toString());
+    JSONArray vertices = results.getJSONArray("results");
+    for (int i = 0; i < vertices.length(); ++i) {
+      JSONObject vertex = vertices.getJSONObject(i);
+      long id = getId(vertex, "_id");
+      result.add(new Element(id, 0));
+    }
+
+    /* get all the edges */
+    url = new URL("http://127.0.0.1:18182/graphs/" + name + "/tp/giraph/edges");
+    conn = (HttpURLConnection) url.openConnection();
+    conn.setRequestMethod("GET");
+    conn.setRequestProperty("Content-Type", "application/json; charset=UTF-8");
+
+    is = conn.getInputStream();
+    json = new StringBuffer();
+    br = new BufferedReader(new InputStreamReader(is));
+    while (br.ready()) {
+      json.append(br.readLine());
+    }
+    br.close();
+    is.close();
+
+    results = new JSONObject(json.toString());
+    JSONArray edges = results.getJSONArray("results");
+    for (int i = 0; i < edges.length(); ++i) {
+      JSONObject edge = edges.getJSONObject(i);
+      long inV = getId(edge, "_inV");
+      long outV = getId(edge, "_outV");
+      long value = edge.getLong("value");
+
+      for (int j = 0; j < result.size(); ++j) {
+        Element element = result.get(j);
+        if (element.id == outV) {
+          element.add("[" + inV + "," + value + "]");
+        }
+      }
+    }
+    return result;
+  }
+
+  private long getId(JSONObject obj, String label) throws Exception {
+    long id = 0;
+    try {
+      id = obj.getLong(label);
+    } catch(JSONException e) {
+      String idString = obj.getString(label);
+      String[] splits = idString.split(":");
+      id = Integer.parseInt(splits[1]);
+    }
+    return id;
+  }
+
+  protected void checkResult(ArrayList<Element> expected,
+    ArrayList<Element> result) throws Exception {
+    for (int i = 0; i < expected.size(); ++i) {
+      boolean found = false;
+      for (int j = 0; j < result.size(); ++j) {
+        if (expected.get(i).equals(result.get(j))) {
+          found = true;
+        }
+      }
+      assert found;
+    }
+  }
+
+  protected static class Element {
+    public long id;
+    public long value;
+    public ArrayList<String> edges;
+
+    public Element(long id, long value) {
+      this.id = id;
+      this.value = value;
+      this.edges = new ArrayList<String>();
+    }
+
+    public void add(String edge) {
+      edges.add(edge);
+    }
+
+    public boolean equals(Element obj) {
+      if (id != obj.id || value != obj.value) {
+        return false;
+      }
+      for (int i = 0; i < edges.size(); ++i) {
+        boolean found = false;
+        for (int j = 0; j < obj.edges.size(); ++j) {
+          if (edges.get(i).equals(obj.edges.get(j))) {
+            found = true;
+          }
+        }
+        if (found == false) {
+          return false;
+        }
+      }
+
+      return true;
+    }
+
+    public String toString() {
+      StringBuffer sb = new StringBuffer();
+      sb.append("id: ");
+      sb.append(id);
+      sb.append(" value: ");
+      sb.append(value);
+      sb.append(" edges: ");
+      for (String element : edges) {
+        sb.append(element + " ");
+      }
+      return  sb.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/emptyneodb-output.json
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/emptyneodb-output.json b/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/emptyneodb-output.json
new file mode 100644
index 0000000..fa9bbc7
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/emptyneodb-output.json
@@ -0,0 +1,5 @@
+[1,0,[[4,4],[5,4]]]
+[2,0,[[3,1],[4,2],[5,1]]]
+[3,0,[[2,1],[5,3]]]
+[4,0,[[2,2]]]
+[5,0,[[3,3],[1,4],[2,1]]]

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/emptyorientdb-output.json
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/emptyorientdb-output.json b/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/emptyorientdb-output.json
new file mode 100644
index 0000000..3475404
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/emptyorientdb-output.json
@@ -0,0 +1,5 @@
+[0,0,[[3,4],[4,4]]]
+[1,0,[[2,1],[3,2],[4,1]]]
+[2,0,[[1,1],[4,3]]]
+[3,0,[[1,2]]]
+[4,0,[[2,3],[0,4],[1,1]]]

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/emptytgdb-output.json
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/emptytgdb-output.json b/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/emptytgdb-output.json
new file mode 100644
index 0000000..ba2dd56
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/emptytgdb-output.json
@@ -0,0 +1,5 @@
+[0,0,[[3,4],[4,4]]]
+[1,0,[[4,1],[3,2],[2,1]]]
+[2,0,[[1,1],[4,3]]]
+[3,0,[[1,2]]]
+[4,0,[[2,3],[0,4],[1,1]]]

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/neodb-output.json
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/neodb-output.json b/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/neodb-output.json
new file mode 100644
index 0000000..f318227
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/neodb-output.json
@@ -0,0 +1,5 @@
+[1,0,[[2,1],[4,3]]]
+[2,0,[[1,1],[3,5],[4,1]]]
+[3,0,[[2,2]]]
+[4,0,[[1,3],[2,1],[5,4]]]
+[5,0,[[3,4],[4,4]]]

http://git-wip-us.apache.org/repos/asf/giraph/blob/5d1bc2de/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/orientdb-output.json
----------------------------------------------------------------------
diff --git a/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/orientdb-output.json b/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/orientdb-output.json
new file mode 100644
index 0000000..c12cc3e
--- /dev/null
+++ b/giraph-rexster/giraph-rexster-io/src/test/resources/org/apache/giraph/rexster/io/formats/orientdb-output.json
@@ -0,0 +1,5 @@
+[0,0,[[1,1],[3,3]]]
+[1,0,[[0,1],[2,5],[3,1]]]
+[2,0,[[1,2]]]
+[3,0,[[0,3],[1,1],[4,4]]]
+[4,0,[[3,4],[2,4]]]


Mime
View raw message