Return-Path: X-Original-To: apmail-hama-commits-archive@www.apache.org Delivered-To: apmail-hama-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 02EDB10C26 for ; Thu, 26 Mar 2015 08:27:25 +0000 (UTC) Received: (qmail 25970 invoked by uid 500); 26 Mar 2015 08:27:24 -0000 Delivered-To: apmail-hama-commits-archive@hama.apache.org Received: (qmail 25937 invoked by uid 500); 26 Mar 2015 08:27:24 -0000 Mailing-List: contact commits-help@hama.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hama.apache.org Delivered-To: mailing list commits@hama.apache.org Received: (qmail 25926 invoked by uid 99); 26 Mar 2015 08:27:24 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Mar 2015 08:27:24 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id B670EAC01AA for ; Thu, 26 Mar 2015 08:27:24 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1669291 - in /hama/trunk: ./ examples/src/main/java/org/apache/hama/examples/ examples/src/test/java/org/apache/hama/examples/ examples/src/test/resources/ ml/src/main/java/org/apache/hama/ml/kcore/ Date: Thu, 26 Mar 2015 08:27:24 -0000 To: commits@hama.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150326082724.B670EAC01AA@hades.apache.org> Author: edwardyoon Date: Thu Mar 26 08:27:24 2015 New Revision: 1669291 URL: http://svn.apache.org/r1669291 Log: HAMA-894: Implement K-core algorithm (Jaegwon Seo via edwardyoon) Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java hama/trunk/examples/src/test/java/org/apache/hama/examples/KCoreTest.java hama/trunk/examples/src/test/resources/kcore.txt hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/ hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreMessage.java hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertex.java hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexReader.java hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexWriter.java Modified: hama/trunk/CHANGES.txt hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java Modified: hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1669291&r1=1669290&r2=1669291&view=diff ============================================================================== --- hama/trunk/CHANGES.txt (original) +++ hama/trunk/CHANGES.txt Thu Mar 26 08:27:24 2015 @@ -4,6 +4,7 @@ Release 0.7.0 (unreleased changes) NEW FEATURES + HAMA-894: Implement K-core algorithm (Jaegwon Seo via edwardyoon) HAMA-907: Add MaxFlow example (Zhengjun via edwardyoon) HAMA-915: Add Kryo serializer (edwardyoon) HAMA-726: Hama on Mesos (Jeff Fenchel via edwardyoon) Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1669291&r1=1669290&r2=1669291&view=diff ============================================================================== --- hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (original) +++ hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java Thu Mar 26 08:27:24 2015 @@ -39,8 +39,9 @@ public class ExampleDriver { pgd.addClass("semi", SemiClusterJobDriver.class, "Semi Clustering"); pgd.addClass("kmeans", Kmeans.class, "K-Means Clustering"); pgd.addClass("gd", GradientDescentExample.class, "Gradient Descent"); - pgd.addClass("neuralnets", NeuralNetwork.class, "Neural Network classification"); - + pgd.addClass("neuralnets", NeuralNetwork.class, + "Neural Network classification"); + pgd.addClass("kcore", KCore.class, "kcore"); pgd.addClass("gen", Generator.class, "Random Data Generator Util"); pgd.driver(args); } catch (Throwable e) { Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java?rev=1669291&view=auto ============================================================================== --- hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java (added) +++ hama/trunk/examples/src/main/java/org/apache/hama/examples/KCore.java Thu Mar 26 08:27:24 2015 @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.graph.GraphJob; +import org.apache.hama.ml.kcore.KCoreMessage; +import org.apache.hama.ml.kcore.KCoreVertex; +import org.apache.hama.ml.kcore.KCoreVertexReader; +import org.apache.hama.ml.kcore.KCoreVertexWriter; + +public class KCore { + private static GraphJob createJob(String[] args, HamaConfiguration conf) + throws IOException { + GraphJob graphJob = new GraphJob(conf, KCore.class); + graphJob.setJobName("KCore"); + + graphJob.setInputPath(new Path(args[0])); + graphJob.setOutputPath(new Path(args[1])); + + graphJob.setVertexClass(KCoreVertex.class); + graphJob.setVertexIDClass(LongWritable.class); + graphJob.setEdgeValueClass(LongWritable.class); + graphJob.setVertexValueClass(KCoreMessage.class); + + graphJob.setVertexInputReaderClass(KCoreVertexReader.class); + graphJob.setVertexOutputWriterClass(KCoreVertexWriter.class); + + graphJob.setOutputKeyClass(LongWritable.class); + graphJob.setOutputValueClass(IntWritable.class); + + return graphJob; + } + + private static void printUsage() { + System.out.println("Usage: "); + System.exit(-1); + } + + public static void main(String[] args) throws IOException, + ClassNotFoundException, InterruptedException { + if (args.length != 2) { + printUsage(); + } + HamaConfiguration conf = new HamaConfiguration(new Configuration()); + GraphJob graphJob = createJob(args, conf); + long startTime = System.currentTimeMillis(); + if (graphJob.waitForCompletion(true)) { + System.out.println("Job Finished in " + + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); + } + } +} Added: hama/trunk/examples/src/test/java/org/apache/hama/examples/KCoreTest.java URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/KCoreTest.java?rev=1669291&view=auto ============================================================================== --- hama/trunk/examples/src/test/java/org/apache/hama/examples/KCoreTest.java (added) +++ hama/trunk/examples/src/test/java/org/apache/hama/examples/KCoreTest.java Thu Mar 26 08:27:24 2015 @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hama.HamaConfiguration; +import org.junit.Test; + +public class KCoreTest extends TestCase { + private static String INPUT = "src/test/resources/kcore.txt"; + private static String OUTPUT = "/tmp/kcore-out"; + private Configuration conf = new HamaConfiguration(); + private FileSystem fs; + + Map outResults = new HashMap(); + + @Override + protected void setUp() throws Exception { + super.setUp(); + fs = FileSystem.get(conf); + } + + @Test + public void testKcore() throws IllegalArgumentException, IOException, + ClassNotFoundException, InterruptedException { + try { + setOutputResult(); + KCore.main(new String[] { INPUT, OUTPUT }); + verifyResult(); + } finally { + fs.exists(new Path(OUTPUT)); + } + } + + public void setOutputResult() { + String[] vertex = { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11" }; + String[] core = { "1", "3", "1", "1", "3", "2", "1", "3", "3", "2", "2" }; + + for (int i = 0; i < vertex.length; i++) { + outResults.put(vertex[i], core[i]); + } + } + + private void verifyResult() throws IllegalArgumentException, IOException { + BufferedReader reader = new BufferedReader(new InputStreamReader( + fs.open(new Path(OUTPUT + "/part-00000")))); + + String line = null; + while ((line = reader.readLine()) != null) { + String[] temp = line.split("\t"); + assertTrue(outResults.containsKey(temp[0])); + assertEquals(temp[1], outResults.get(temp[0])); + } + } +} Added: hama/trunk/examples/src/test/resources/kcore.txt URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/resources/kcore.txt?rev=1669291&view=auto ============================================================================== --- hama/trunk/examples/src/test/resources/kcore.txt (added) +++ hama/trunk/examples/src/test/resources/kcore.txt Thu Mar 26 08:27:24 2015 @@ -0,0 +1,11 @@ +1 2 +2 1 3 9 8 5 +3 2 +4 5 +5 2 4 6 8 9 +6 5 7 8 +7 6 +8 2 5 6 9 10 11 +9 2 5 8 +10 8 11 +11 8 10 \ No newline at end of file Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreMessage.java URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreMessage.java?rev=1669291&view=auto ============================================================================== --- hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreMessage.java (added) +++ hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreMessage.java Thu Mar 26 08:27:24 2015 @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.kcore; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +public class KCoreMessage implements Writable { + private long vertexID; + private int core; + + public KCoreMessage() { + this.vertexID = 0L; + this.core = 0; + } + + public KCoreMessage(long vertexID, int core) { + this.vertexID = vertexID; + this.core = core; + } + + @Override + public void readFields(DataInput input) throws IOException { + vertexID = input.readLong(); + core = input.readInt(); + } + + @Override + public void write(DataOutput output) throws IOException { + output.writeLong(vertexID); + output.writeInt(core); + } + + public long getVertexID(){ + return vertexID; + } + + public int getCore(){ + return core; + } +} Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertex.java URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertex.java?rev=1669291&view=auto ============================================================================== --- hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertex.java (added) +++ hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertex.java Thu Mar 26 08:27:24 2015 @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.kcore; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hama.graph.Edge; +import org.apache.hama.graph.Vertex; +import org.apache.log4j.Logger; + +import com.google.common.collect.Lists; + +public class KCoreVertex extends + Vertex { + + private static final Logger logger = Logger.getLogger(KCoreVertex.class); + private int core; + private boolean changed; + private HashMap estimates; + + public KCoreVertex() { + super(); + this.changed = false; + this.core = 0; + this.estimates = new HashMap(); + } + + public KCoreVertex(int core) { + super(); + this.changed = false; + this.core = core; + this.estimates = new HashMap(); + } + + public KCoreVertex(int core, HashMap estimates) { + super(); + this.changed = false; + this.core = core; + this.estimates = estimates; + } + + public boolean isChanged() { + return changed; + } + + public void setChanged(boolean changed) { + this.changed = changed; + } + + public int getCore() { + return core; + } + + public void setCore(int core) { + this.core = core; + } + + public double getNeighborEstimate(long neighbor) { + if (estimates.containsKey(neighbor)) { + return estimates.get(neighbor); + } + return (-1); + } + + public void setNeighborNewEstimate(long neighbor, int estimate) { + if (this.estimates.containsKey(neighbor)) { + this.estimates.put(neighbor, estimate); + } + } + + public void logNeighborEstimates(long vertexId) { + logger.info(vertexId + " neighbor estimates: "); + for (Map.Entry entry : estimates.entrySet()) { + logger.info("\t" + entry.getKey() + "-" + entry.getValue()); + } + } + + public void setNeighborEstimate(long neighbor, int estimate) { + estimates.put(neighbor, estimate); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + core = in.readInt(); + changed = in.readBoolean(); + + this.estimates = new HashMap(); + if (in.readBoolean()) { + int num = in.readInt(); + for (int i = 0; i < num; ++i) { + long key = in.readLong(); + int value = in.readInt(); + estimates.put(key, value); + } + } + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeDouble(core); + out.writeBoolean(changed); + + if (this.estimates == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeInt(this.estimates.size()); + + for (Map.Entry entry : estimates.entrySet()) { + out.writeLong(entry.getKey()); + out.writeInt(entry.getValue()); + } + } + } + + public int computeEstimate() { + int old = this.core; + double[] count = new double[this.core + 1]; + + for (Map.Entry entry : this.estimates.entrySet()) { + logger.info("Processing " + entry.getKey() + ": " + entry.getValue()); + double j = Math.min(this.core, entry.getValue().doubleValue()); + logger.info("Min: " + j); + count[(int) j] = count[(int) j] + 1; + } + + logger.info("Count before"); + int i; + for (i = 0; i < count.length; i++) { + logger.info(i + " " + count[i]); + } + + for (i = this.core; i > 1; i--) + count[i - 1] = count[i - 1] + count[i]; + + logger.info("Count after"); + for (i = 0; i < count.length; i++) { + logger.info(i + " " + count[i]); + } + + i = this.core; + while ((i > 1) && (count[i] < i)) { + logger.info("Decrementing" + i + " down one because " + count[i] + + " is less than that"); + i = i - 1; + } + logger.info("Loop terminated: i: " + i + " and count[i] = " + count[i]); + + if (i != old) { + logger.info("New Core Estimate: " + i + "\n"); + } + return i; + } + + @Override + public void compute(Iterable msgs) throws IOException { + if (this.getSuperstepCount() == 0) { + this.core = getEdges().size(); + + for (Edge edge : getEdges()) { + sendMessage(edge, new KCoreMessage(getVertexID().get(), + getCore())); + } + + } else { + logger.info("getSuperstepCount = " + getSuperstepCount() + + " vertex = " + getVertexID() + " Core = " + getCore()); + + List messages = Lists.newArrayList(msgs); + if (this.getSuperstepCount() == 1) { + for (KCoreMessage message : messages) { + estimates.put(message.getVertexID(), + (Integer.MAX_VALUE)); + } + } + + logger.info(getVertexID() + " got estimates of: "); + for (KCoreMessage message : messages) { + + logger.info("Processing message from " + + message.getVertexID()); + + double temp = getNeighborEstimate(message.getVertexID()); + + if (message.getCore() < temp) { + setNeighborNewEstimate(message.getVertexID(), + message.getCore()); + + int t = computeEstimate(); + + if (t < getCore()) { + logger.info("Setting new core value! \n\n"); + setCore(t); + setChanged(true); + } + } + } + logger.info("Done recomputing estimate for node " + getVertexID()); + + if (!isChanged()) { + this.voteToHalt(); + } else { + for (Edge edge : getEdges()) { + sendMessage(edge, new KCoreMessage(getVertexID().get(), + getCore())); + } + setChanged(false); + } + } +} + + +} Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexReader.java URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexReader.java?rev=1669291&view=auto ============================================================================== --- hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexReader.java (added) +++ hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexReader.java Thu Mar 26 08:27:24 2015 @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.kcore; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.graph.Edge; +import org.apache.hama.graph.Vertex; +import org.apache.hama.graph.VertexInputReader; + +/** +* VertexInputFormat for the KCore algorithm +* specified in tab-delimited text file format. +*/ +public class KCoreVertexReader + extends + VertexInputReader { + + @Override + public boolean parseVertex(LongWritable key, Text value, + Vertex vertex) throws Exception { + String[] vertices = value.toString().split("\t"); + List> edges = new ArrayList>(); + + for(int i=1; i(destID, new LongWritable(0))); + } + + vertex.setEdges(edges); + vertex.setValue(new KCoreMessage()); + vertex.setVertexID(new LongWritable(Long.parseLong(vertices[0]))); + return true; + } +} Added: hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexWriter.java URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexWriter.java?rev=1669291&view=auto ============================================================================== --- hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexWriter.java (added) +++ hama/trunk/ml/src/main/java/org/apache/hama/ml/kcore/KCoreVertexWriter.java Thu Mar 26 08:27:24 2015 @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.ml.kcore; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.graph.GraphJobMessage; +import org.apache.hama.graph.Vertex; +import org.apache.hama.graph.VertexOutputWriter; + +public class KCoreVertexWriter + implements + VertexOutputWriter { + + @Override + public void setup(Configuration conf) { + + } + + @Override + public void write( + Vertex vertex, + BSPPeer peer) + throws IOException { + peer.write(vertex.getVertexID(), + new IntWritable(((KCoreVertex) vertex).getCore())); + } +}