hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1669291 - in /hama/trunk: ./ examples/src/main/java/org/apache/hama/examples/ examples/src/test/java/org/apache/hama/examples/ examples/src/test/resources/ ml/src/main/java/org/apache/hama/ml/kcore/
Date Thu, 26 Mar 2015 08:27:24 GMT
Author: edwardyoon
Date: Thu Mar 26 08:27:24 2015
New Revision: 1669291

URL: http://svn.apache.org/r1669291
Log:
HAMA-894: Implement K-core algorithm (Jaegwon Seo via edwardyoon)

Added:
    hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/KCoreTest.java
    hama/trunk/examples/src/test/resources/kcore.txt
    hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/
    hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreMessage.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertex.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexReader.java
    hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexWriter.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1669291&r1=1669290&r2=1669291&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Mar 26 08:27:24 2015
@@ -4,6 +4,7 @@ Release 0.7.0 (unreleased changes)
 
   NEW FEATURES
 
+   HAMA-894: Implement K-core algorithm (Jaegwon Seo via edwardyoon)
    HAMA-907: Add MaxFlow example (Zhengjun via edwardyoon)
    HAMA-915: Add Kryo serializer (edwardyoon)
    HAMA-726: Hama on Mesos (Jeff Fenchel via edwardyoon)

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1669291&r1=1669290&r2=1669291&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java Thu Mar
26 08:27:24 2015
@@ -39,8 +39,9 @@ public class ExampleDriver {
       pgd.addClass("semi", SemiClusterJobDriver.class, "Semi Clustering");
       pgd.addClass("kmeans", Kmeans.class, "K-Means Clustering");
       pgd.addClass("gd", GradientDescentExample.class, "Gradient Descent");
-      pgd.addClass("neuralnets", NeuralNetwork.class, "Neural Network classification");
-
+      pgd.addClass("neuralnets", NeuralNetwork.class,
+          "Neural Network classification");
+      pgd.addClass("kcore", KCore.class, "kcore");
       pgd.addClass("gen", Generator.class, "Random Data Generator Util");
       pgd.driver(args);
     } catch (Throwable e) {

Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java?rev=1669291&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java (added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java Thu Mar 26 08:27:24
2015
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.graph.GraphJob;
+import org.apache.hama.ml.kcore.KCoreMessage;
+import org.apache.hama.ml.kcore.KCoreVertex;
+import org.apache.hama.ml.kcore.KCoreVertexReader;
+import org.apache.hama.ml.kcore.KCoreVertexWriter;
+
+public class KCore {
+  private static GraphJob createJob(String[] args, HamaConfiguration conf)
+      throws IOException {
+    GraphJob graphJob = new GraphJob(conf, KCore.class);
+    graphJob.setJobName("KCore");
+
+    graphJob.setInputPath(new Path(args[0]));
+    graphJob.setOutputPath(new Path(args[1]));
+
+    graphJob.setVertexClass(KCoreVertex.class);
+    graphJob.setVertexIDClass(LongWritable.class);
+    graphJob.setEdgeValueClass(LongWritable.class);
+    graphJob.setVertexValueClass(KCoreMessage.class);
+
+    graphJob.setVertexInputReaderClass(KCoreVertexReader.class);
+    graphJob.setVertexOutputWriterClass(KCoreVertexWriter.class);
+
+    graphJob.setOutputKeyClass(LongWritable.class);
+    graphJob.setOutputValueClass(IntWritable.class);
+
+    return graphJob;
+  }
+
+  private static void printUsage() {
+    System.out.println("Usage: <input> <output>");
+    System.exit(-1);
+  }
+
+  public static void main(String[] args) throws IOException,
+      ClassNotFoundException, InterruptedException {
+    if (args.length != 2) {
+      printUsage();
+    }
+    HamaConfiguration conf = new HamaConfiguration(new Configuration());
+    GraphJob graphJob = createJob(args, conf);
+    long startTime = System.currentTimeMillis();
+    if (graphJob.waitForCompletion(true)) {
+      System.out.println("Job Finished in "
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+    }
+  }
+}

Added: hama/trunk/examples/src/test/java/org/apache/hama/examples/KCoreTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/KCoreTest.java?rev=1669291&view=auto
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/KCoreTest.java (added)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/KCoreTest.java Thu Mar 26 08:27:24
2015
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.HamaConfiguration;
+import org.junit.Test;
+
+public class KCoreTest extends TestCase {
+  private static String INPUT = "src/test/resources/kcore.txt";
+  private static String OUTPUT = "/tmp/kcore-out";
+  private Configuration conf = new HamaConfiguration();
+  private FileSystem fs;
+
+  Map<String, String> outResults = new HashMap<String, String>();
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    fs = FileSystem.get(conf);
+  }
+
+  @Test
+  public void testKcore() throws IllegalArgumentException, IOException,
+      ClassNotFoundException, InterruptedException {
+    try {
+      setOutputResult();
+      KCore.main(new String[] { INPUT, OUTPUT });
+      verifyResult();
+    } finally {
+      fs.exists(new Path(OUTPUT));
+    }
+  }
+
+  public void setOutputResult() {
+    String[] vertex = { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11" };
+    String[] core = { "1", "3", "1", "1", "3", "2", "1", "3", "3", "2", "2" };
+
+    for (int i = 0; i < vertex.length; i++) {
+      outResults.put(vertex[i], core[i]);
+    }
+  }
+
+  private void verifyResult() throws IllegalArgumentException, IOException {
+    BufferedReader reader = new BufferedReader(new InputStreamReader(
+        fs.open(new Path(OUTPUT + "/part-00000"))));
+
+    String line = null;
+    while ((line = reader.readLine()) != null) {
+      String[] temp = line.split("\t");
+      assertTrue(outResults.containsKey(temp[0]));
+      assertEquals(temp[1], outResults.get(temp[0]));
+    }
+  }
+}

Added: hama/trunk/examples/src/test/resources/kcore.txt
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/resources/kcore.txt?rev=1669291&view=auto
==============================================================================
--- hama/trunk/examples/src/test/resources/kcore.txt (added)
+++ hama/trunk/examples/src/test/resources/kcore.txt Thu Mar 26 08:27:24 2015
@@ -0,0 +1,11 @@
+1	2
+2	1	3	9	8	5
+3	2
+4	5
+5	2	4	6	8	9
+6	5	7	8
+7	6
+8	2	5	6	9	10	11
+9	2	5	8
+10	8	11
+11	8	10
\ No newline at end of file

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreMessage.java?rev=1669291&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreMessage.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreMessage.java Thu Mar 26 08:27:24
2015
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.kcore;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class KCoreMessage implements Writable {
+  private long vertexID;
+  private int core;
+
+  public KCoreMessage() {
+    this.vertexID = 0L;
+    this.core = 0;
+  }
+
+  public KCoreMessage(long vertexID, int core) {
+    this.vertexID = vertexID;
+    this.core = core;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    vertexID = input.readLong();
+    core = input.readInt();
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeLong(vertexID);
+    output.writeInt(core);
+  }
+  
+  public long getVertexID(){
+    return vertexID;
+  }
+  
+  public int getCore(){
+    return core;
+  }
+}

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertex.java?rev=1669291&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertex.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertex.java Thu Mar 26 08:27:24
2015
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.kcore;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hama.graph.Edge;
+import org.apache.hama.graph.Vertex;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Lists;
+
+public class KCoreVertex extends
+    Vertex<LongWritable, LongWritable, KCoreMessage> {
+
+  private static final Logger logger = Logger.getLogger(KCoreVertex.class);
+  private int core;
+  private boolean changed;
+  private HashMap<Long, Integer> estimates;
+
+  public KCoreVertex() {
+    super();
+    this.changed = false;
+    this.core = 0;
+    this.estimates = new HashMap<Long, Integer>();
+  }
+
+  public KCoreVertex(int core) {
+    super();
+    this.changed = false;
+    this.core = core;
+    this.estimates = new HashMap<Long, Integer>();
+  }
+
+  public KCoreVertex(int core, HashMap<Long, Integer> estimates) {
+    super();
+    this.changed = false;
+    this.core = core;
+    this.estimates = estimates;
+  }
+
+  public boolean isChanged() {
+    return changed;
+  }
+
+  public void setChanged(boolean changed) {
+    this.changed = changed;
+  }
+
+  public int getCore() {
+    return core;
+  }
+
+  public void setCore(int core) {
+    this.core = core;
+  }
+
+  public double getNeighborEstimate(long neighbor) {
+    if (estimates.containsKey(neighbor)) {
+      return estimates.get(neighbor);
+    }
+    return (-1);
+  }
+
+  public void setNeighborNewEstimate(long neighbor, int estimate) {
+    if (this.estimates.containsKey(neighbor)) {
+      this.estimates.put(neighbor, estimate);
+    }
+  }
+
+  public void logNeighborEstimates(long vertexId) {
+    logger.info(vertexId + " neighbor estimates: ");
+    for (Map.Entry<Long, Integer> entry : estimates.entrySet()) {
+      logger.info("\t" + entry.getKey() + "-" + entry.getValue());
+    }
+  }
+
+  public void setNeighborEstimate(long neighbor, int estimate) {
+    estimates.put(neighbor, estimate);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    core = in.readInt();
+    changed = in.readBoolean();
+
+    this.estimates = new HashMap<Long, Integer>();
+    if (in.readBoolean()) {
+      int num = in.readInt();
+      for (int i = 0; i < num; ++i) {
+        long key = in.readLong();
+        int value = in.readInt();
+        estimates.put(key, value);
+      }
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeDouble(core);
+    out.writeBoolean(changed);
+
+    if (this.estimates == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      out.writeInt(this.estimates.size());
+
+      for (Map.Entry<Long, Integer> entry : estimates.entrySet()) {
+        out.writeLong(entry.getKey());
+        out.writeInt(entry.getValue());
+      }
+    }
+  }
+
+  public int computeEstimate() {
+    int old = this.core;
+    double[] count = new double[this.core + 1];
+
+    for (Map.Entry<Long, Integer> entry : this.estimates.entrySet()) {
+      logger.info("Processing " + entry.getKey() + ": " + entry.getValue());
+      double j = Math.min(this.core, entry.getValue().doubleValue());
+      logger.info("Min: " + j);
+      count[(int) j] = count[(int) j] + 1;
+    }
+
+    logger.info("Count before");
+    int i;
+    for (i = 0; i < count.length; i++) {
+      logger.info(i + " " + count[i]);
+    }
+
+    for (i = this.core; i > 1; i--)
+      count[i - 1] = count[i - 1] + count[i];
+
+    logger.info("Count after");
+    for (i = 0; i < count.length; i++) {
+      logger.info(i + " " + count[i]);
+    }
+
+    i = this.core;
+    while ((i > 1) && (count[i] < i)) {
+      logger.info("Decrementing" + i + " down one because " + count[i]
+          + " is less than that");
+      i = i - 1;
+    }
+    logger.info("Loop terminated: i: " + i + " and count[i] = " + count[i]);
+
+    if (i != old) {
+      logger.info("New Core Estimate: " + i + "\n");
+    }
+    return i;
+  }
+
+  @Override
+  public void compute(Iterable<KCoreMessage> msgs) throws IOException {
+    if (this.getSuperstepCount() == 0) {
+        this.core = getEdges().size();
+
+        for (Edge<LongWritable, LongWritable> edge : getEdges()) {
+            sendMessage(edge, new KCoreMessage(getVertexID().get(),
+                    getCore()));
+        }
+
+    } else {
+        logger.info("getSuperstepCount = " + getSuperstepCount()
+                + " vertex = " + getVertexID() + " Core = " + getCore());
+
+        List<KCoreMessage> messages = Lists.newArrayList(msgs);
+        if (this.getSuperstepCount() == 1) {
+            for (KCoreMessage message : messages) {
+                estimates.put(message.getVertexID(),
+                        (Integer.MAX_VALUE));
+            }
+        }
+
+        logger.info(getVertexID() + " got estimates of: ");
+        for (KCoreMessage message : messages) {
+
+            logger.info("Processing message from "
+                    + message.getVertexID());
+
+            double temp = getNeighborEstimate(message.getVertexID());
+
+            if (message.getCore() < temp) {
+                setNeighborNewEstimate(message.getVertexID(),
+                        message.getCore());
+
+                int t = computeEstimate();
+
+                if (t < getCore()) {
+                    logger.info("Setting new core value! \n\n");
+                    setCore(t);
+                    setChanged(true);
+                }
+            }
+        }
+        logger.info("Done recomputing estimate for node " + getVertexID());
+
+        if (!isChanged()) {
+            this.voteToHalt();
+        } else {
+            for (Edge<LongWritable, LongWritable> edge : getEdges()) {
+                sendMessage(edge, new KCoreMessage(getVertexID().get(),
+                        getCore()));
+            }
+            setChanged(false);
+        }
+    }
+}
+
+
+}

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexReader.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexReader.java?rev=1669291&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexReader.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexReader.java Thu Mar 26
08:27:24 2015
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.kcore;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.graph.Edge;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
+
+/**
+* VertexInputFormat for the KCore algorithm
+* specified in tab-delimited text file format.
+*/
+public class KCoreVertexReader
+    extends
+    VertexInputReader<LongWritable, Text, LongWritable, LongWritable, KCoreMessage>
{
+
+  @Override
+  public boolean parseVertex(LongWritable key, Text value,
+      Vertex<LongWritable, LongWritable, KCoreMessage> vertex) throws Exception {
+    String[] vertices = value.toString().split("\t");
+    List<Edge<LongWritable, LongWritable>> edges = new ArrayList<Edge<LongWritable,
LongWritable>>();
+    
+    for(int i=1; i<vertices.length; i++){
+      LongWritable destID = new LongWritable(Long.parseLong(vertices[i]));
+      edges.add(new Edge<LongWritable, LongWritable>(destID, new LongWritable(0)));
+    }
+    
+    vertex.setEdges(edges);
+    vertex.setValue(new KCoreMessage());
+    vertex.setVertexID(new LongWritable(Long.parseLong(vertices[0])));
+    return true;
+  }
+}

Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexWriter.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexWriter.java?rev=1669291&view=auto
==============================================================================
--- hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexWriter.java (added)
+++ hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexWriter.java Thu Mar 26
08:27:24 2015
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ml.kcore;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.graph.GraphJobMessage;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexOutputWriter;
+
+public class KCoreVertexWriter
+    implements
+    VertexOutputWriter<LongWritable, IntWritable, LongWritable, LongWritable, KCoreMessage>
{
+
+  @Override
+  public void setup(Configuration conf) {
+
+  }
+
+  @Override
+  public void write(
+      Vertex<LongWritable, LongWritable, KCoreMessage> vertex,
+      BSPPeer<Writable, Writable, LongWritable, IntWritable, GraphJobMessage> peer)
+      throws IOException {
+    peer.write(vertex.getVertexID(),
+        new IntWritable(((KCoreVertex) vertex).getCore()));
+  }
+}



Mime
View raw message