hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomm...@apache.org
Subject svn commit: r1525198 - in /hama/trunk/graph/src: main/java/org/apache/hama/graph/OffHeapVerticesInfo.java test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
Date Sat, 21 Sep 2013 06:44:05 GMT
Author: tommaso
Date: Sat Sep 21 06:44:05 2013
New Revision: 1525198

URL: http://svn.apache.org/r1525198
Log:
HAMA-732 - applied patch for integration with DM for OffHeapVerticesInfo

Added:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java

Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java?rev=1525198&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java Sat Sep
21 06:44:05 2013
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.directmemory.DirectMemory;
+import org.apache.directmemory.cache.CacheService;
+import org.apache.directmemory.memory.Pointer;
+import org.apache.directmemory.serialization.Serializer;
+import org.apache.directmemory.serialization.kryo.KryoSerializer;
+import org.apache.directmemory.utils.CacheValuesIterable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.util.ReflectionUtils;
+
+/**
+ * An off heap version of a {@link org.apache.hama.graph.Vertex} storage.
+ */
+public class OffHeapVerticesInfo<V extends WritableComparable<?>, E extends Writable,
M extends Writable>
+        implements VerticesInfo<V, E, M> {
+
+    public static final String DM_STRICT_ITERATOR = "dm.iterator.strict";
+    public static final String DM_BUFFERS = "dm.buffers";
+    public static final String DM_SIZE = "dm.size";
+    public static final String DM_CAPACITY = "dm.capacity";
+    public static final String DM_CONCURRENCY = "dm.concurrency";
+    public static final String DM_DISPOSAL_TIME = "dm.disposal.time";
+    public static final String DM_SERIALIZER = "dm.serializer";
+    public static final String DM_SORTED = "dm.sorted";
+
+    private CacheService<V, Vertex<V, E, M>> vertices;
+
+    private boolean strict;
+    private GraphJobRunner<V, E, M> runner;
+
+    @Override
+    public void init(GraphJobRunner<V, E, M> runner, Configuration conf, TaskAttemptID
attempt) throws IOException {
+        this.runner = runner;
+        this.strict = conf.getBoolean(DM_STRICT_ITERATOR, true);
+        DirectMemory<V, Vertex<V, E, M>> dm = new DirectMemory<V, Vertex<V,
E, M>>()
+                .setNumberOfBuffers(conf.getInt(DM_BUFFERS, 100))
+                .setSize(conf.getInt(DM_SIZE, 102400))
+                .setSerializer(ReflectionUtils.newInstance(conf.getClass(DM_SERIALIZER, KryoSerializer.class,
Serializer.class)))
+                .setDisposalTime(conf.getInt(DM_DISPOSAL_TIME, 3600000));
+        if (conf.getBoolean(DM_SORTED, true)) {
+            dm.setMap(new ConcurrentSkipListMap<V, Pointer<Vertex<V, E, M>>>());
+        } else {
+            dm.setInitialCapacity(conf.getInt(DM_CAPACITY, 1000))
+                    .setConcurrencyLevel(conf.getInt(DM_CONCURRENCY, 10));
+        }
+
+        this.vertices = dm.newCacheService();
+
+    }
+
+    @Override
+    public void cleanup(Configuration conf, TaskAttemptID attempt) throws IOException {
+        vertices.dump();
+    }
+
+    public void addVertex(Vertex<V, E, M> vertex) {
+        vertices.put(vertex.getVertexID(), vertex);
+    }
+
+    @Override
+    public void finishAdditions() {
+    }
+
+    @Override
+    public void startSuperstep() throws IOException {
+    }
+
+    @Override
+    public void finishSuperstep() throws IOException {
+    }
+
+    @Override
+    public void finishVertexComputation(Vertex<V, E, M> vertex) throws IOException
{
+        vertices.put(vertex.getVertexID(), vertex);
+    }
+
+    public void clear() {
+        vertices.clear();
+    }
+
+    public int size() {
+        return (int) this.vertices.entries();
+    }
+
+    @Override
+    public IDSkippingIterator<V, E, M> skippingIterator() {
+        final Iterator<Vertex<V, E, M>> vertexIterator =
+                new CacheValuesIterable<V, Vertex<V, E, M>>(vertices, strict).iterator();
+
+        return new IDSkippingIterator<V, E, M>() {
+            int currentIndex = 0;
+
+            Vertex<V, E, M> currentVertex = null;
+
+            @Override
+            public boolean hasNext(V e,
+                                   org.apache.hama.graph.IDSkippingIterator.Strategy strat)
{
+                if (currentIndex < vertices.entries()) {
+
+                    Vertex<V, E, M> next = vertexIterator.next();
+                    while (!strat.accept(next, e)) {
+                        currentIndex++;
+                    }
+                    currentVertex = next;
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            @Override
+            public Vertex<V, E, M> next() {
+                currentIndex++;
+                if (currentVertex.getRunner() == null) {
+                  currentVertex.setRunner(runner);
+                }
+                return currentVertex;
+            }
+
+        };
+
+    }
+
+    @Override
+    public void removeVertex(V vertexID) {
+      throw new UnsupportedOperationException ("Not yet implemented");
+    }
+
+    @Override
+    public void finishRemovals() {
+      throw new UnsupportedOperationException ("Not yet implemented");      
+    }
+
+}

Added: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java?rev=1525198&view=auto
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java (added)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java Sat
Sep 21 06:44:05 2013
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.graph.example.PageRank.PageRankVertex;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestOffHeapVerticesInfo {
+
+  @Test
+  public void testOffHeapVerticesInfoLifeCycle() throws Exception {
+    OffHeapVerticesInfo<Text, NullWritable, DoubleWritable> info = new OffHeapVerticesInfo<Text,
NullWritable, DoubleWritable>();
+    Configuration conf = new Configuration();
+    conf.set(GraphJob.VERTEX_CLASS_ATTR, PageRankVertex.class.getName());
+    conf.set(GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR,
+        NullWritable.class.getName());
+    conf.set(GraphJob.VERTEX_ID_CLASS_ATTR, Text.class.getName());
+    conf.set(GraphJob.VERTEX_VALUE_CLASS_ATTR, DoubleWritable.class.getName());
+    GraphJobRunner.<Text, NullWritable, DoubleWritable> initClasses(conf);
+    TaskAttemptID attempt = new TaskAttemptID("123", 1, 1, 0);
+    try {
+      ArrayList<PageRankVertex> list = new ArrayList<PageRankVertex>();
+
+      for (int i = 0; i < 10; i++) {
+        PageRankVertex v = new PageRankVertex();
+        v.setVertexID(new Text(i + ""));
+        if (i % 2 == 0) {
+          v.setValue(new DoubleWritable(i * 2));
+        }
+        v.addEdge(new Edge<Text, NullWritable>(new Text((10 - i) + ""), null));
+
+        list.add(v);
+      }
+
+      info.init(null, conf, attempt);
+      for (PageRankVertex v : list) {
+        info.addVertex(v);
+      }
+
+      info.finishAdditions();
+
+      assertEquals(10, info.size());
+      // no we want to iterate and check if the result can properly be obtained
+
+      int index = 0;
+      IDSkippingIterator<Text, NullWritable, DoubleWritable> iterator = info
+          .skippingIterator();
+      while (iterator.hasNext()) {
+        Vertex<Text, NullWritable, DoubleWritable> next = iterator.next();
+        PageRankVertex pageRankVertex = list.get(index);
+        assertEquals(pageRankVertex.getVertexID().toString(), next
+            .getVertexID().toString());
+        if (index % 2 == 0) {
+          assertEquals((int) next.getValue().get(), index * 2);
+        } else {
+          assertNull(next.getValue());
+        }
+        assertEquals(next.isHalted(), false);
+        // check edges
+        List<Edge<Text, NullWritable>> edges = next.getEdges();
+        assertEquals(1, edges.size());
+        Edge<Text, NullWritable> edge = edges.get(0);
+        assertEquals(pageRankVertex.getEdges().get(0).getDestinationVertexID()
+            .toString(), edge.getDestinationVertexID().toString());
+        assertNull(edge.getValue());
+
+        index++;
+      }
+      assertEquals(index, list.size());
+      info.finishSuperstep();
+      // iterate again and compute so vertices change internally
+      iterator = info.skippingIterator();
+      info.startSuperstep();
+      while (iterator.hasNext()) {
+        Vertex<Text, NullWritable, DoubleWritable> next = iterator.next();
+        // override everything with constant 2
+        next.setValue(new DoubleWritable(2));
+        if (Integer.parseInt(next.getVertexID().toString()) == 3) {
+          next.voteToHalt();
+        }
+        info.finishVertexComputation(next);
+      }
+      info.finishSuperstep();
+      assertEquals(index, list.size());
+
+    } finally {
+      info.cleanup(conf, attempt);
+    }
+
+  }
+
+  @Test
+  public void testAdditionWithDefaults() throws Exception {
+    OffHeapVerticesInfo<Text, NullWritable, DoubleWritable> verticesInfo =
+            new OffHeapVerticesInfo<Text, NullWritable, DoubleWritable>();
+    Configuration conf = new Configuration();
+    verticesInfo.init(null, conf, null);
+    Vertex<Text, NullWritable, DoubleWritable> vertex = new PageRankVertex();
+    vertex.setVertexID(new Text("some-id"));
+    verticesInfo.addVertex(vertex);
+    assertTrue("added vertex could not be found in the cache", verticesInfo.skippingIterator().hasNext());
+  }
+
+  @Test
+  public void testMassiveAdditionWithDefaults() throws Exception {
+    OffHeapVerticesInfo<Text, NullWritable, DoubleWritable> verticesInfo =
+            new OffHeapVerticesInfo<Text, NullWritable, DoubleWritable>();
+    Configuration conf = new Configuration();
+    verticesInfo.init(null, conf, null);
+    assertEquals("vertices info size should be 0 at startup", 0, verticesInfo.size());
+    Random r = new Random();
+    int i = 10000;
+    for (int n = 0; n < i; n++) {
+      Vertex<Text, NullWritable, DoubleWritable> vertex = new PageRankVertex();
+      vertex.setVertexID(new Text(String.valueOf(r.nextInt())));
+      vertex.setValue(new DoubleWritable(r.nextDouble()));
+      verticesInfo.addVertex(vertex);
+    }
+    verticesInfo.finishAdditions();
+    assertEquals("vertices info size is not correct", i, verticesInfo.size());
+  }
+
+}



Mime
View raw message