Return-Path: X-Original-To: apmail-hama-commits-archive@www.apache.org Delivered-To: apmail-hama-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 024DD178AB for ; Thu, 12 Mar 2015 01:23:17 +0000 (UTC) Received: (qmail 35676 invoked by uid 500); 12 Mar 2015 01:23:16 -0000 Delivered-To: apmail-hama-commits-archive@hama.apache.org Received: (qmail 35641 invoked by uid 500); 12 Mar 2015 01:23:16 -0000 Mailing-List: contact commits-help@hama.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hama.apache.org Delivered-To: mailing list commits@hama.apache.org Received: (qmail 35630 invoked by uid 99); 12 Mar 2015 01:23:16 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Mar 2015 01:23:16 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id AC3B3AC026D for ; Thu, 12 Mar 2015 01:23:16 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1666061 - in /hama/trunk: ./ commons/src/main/java/org/apache/hama/commons/math/ core/ core/src/main/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/util/ graph/ graph/src/main/java/org/apache/hama/graph/ ml/src/test/java/org/... Date: Thu, 12 Mar 2015 01:23:16 -0000 To: commits@hama.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150312012316.AC3B3AC026D@hades.apache.org> Author: edwardyoon Date: Thu Mar 12 01:23:15 2015 New Revision: 1666061 URL: http://svn.apache.org/r1666061 Log: HAMA-932: Use of Kryo Serializer Modified: hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleMatrix.java hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleVector.java hama/trunk/core/pom.xml hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java hama/trunk/graph/pom.xml hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/TestOnlineCF.java hama/trunk/pom.xml Modified: hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleMatrix.java URL: http://svn.apache.org/viewvc/hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleMatrix.java?rev=1666061&r1=1666060&r2=1666061&view=diff ============================================================================== --- hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleMatrix.java (original) +++ hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleMatrix.java Thu Mar 12 01:23:15 2015 @@ -29,10 +29,12 @@ import com.google.common.base.Preconditi */ public final class DenseDoubleMatrix implements DoubleMatrix { - protected final double[][] matrix; - protected final int numRows; - protected final int numColumns; + protected double[][] matrix; + protected int numRows; + protected int numColumns; + public DenseDoubleMatrix() {} + /** * Creates a new empty matrix from the rows and columns. * Modified: hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleVector.java URL: http://svn.apache.org/viewvc/hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleVector.java?rev=1666061&r1=1666060&r2=1666061&view=diff ============================================================================== --- hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleVector.java (original) +++ hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleVector.java Thu Mar 12 01:23:15 2015 @@ -33,8 +33,10 @@ import com.google.common.collect.Abstrac */ public final class DenseDoubleVector implements DoubleVector { - private final double[] vector; + private double[] vector; + public DenseDoubleVector() {} + /** * Creates a new vector with the given length. */ Modified: hama/trunk/core/pom.xml URL: http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1666061&r1=1666060&r2=1666061&view=diff ============================================================================== --- hama/trunk/core/pom.xml (original) +++ hama/trunk/core/pom.xml Thu Mar 12 01:23:15 2015 @@ -141,14 +141,9 @@ zookeeper - org.apache.directmemory - directmemory-cache - 0.2 - - - org.apache.directmemory - directmemory-kryo - 0.2 + com.esotericsoftware + kryo + ${kryo.version} io.netty Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1666061&r1=1666060&r2=1666061&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Thu Mar 12 01:23:15 2015 @@ -20,16 +20,17 @@ package org.apache.hama.bsp; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; -import java.io.DataOutputStream; import java.io.IOException; import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; /** * BSPMessageBundle stores a group of messages so that they can be sent in batch @@ -44,8 +45,9 @@ public class BSPMessageBundle iterator() { - bis = new ByteArrayInputStream(byteBuffer.toByteArray()); - dis = new DataInputStream(bis); Iterator it = new Iterator() { + ByteArrayInputStream bis = new ByteArrayInputStream(outputStream.toByteArray()); + Input in = new Input(bis, 4096); + Class clazz = null; - M msg; + int counter = 0; @Override public boolean hasNext() { - try { - if (dis.available() > 0) { - return true; - } else { - return false; - } - } catch (IOException e) { + if ((bundleSize - counter) > 0) { + return true; + } else { return false; } } @@ -104,17 +99,13 @@ public class BSPMessageBundle) Class.forName(className); } - - msg = ReflectionUtils.newInstance(clazz, null); - msg.readFields(dis); - - } catch (IOException ie) { - LOG.error(ie); } catch (ClassNotFoundException ce) { LOG.error("Class was not found.", ce); } - return msg; + counter++; + + return kryo.readObject(in, clazz); } @Override @@ -134,29 +125,30 @@ public class BSPMessageBundle 0) { out.writeUTF(className); - out.writeInt(byteBuffer.size()); - out.write(byteBuffer.toByteArray()); + out.writeInt(outputStream.size()); + out.write(outputStream.toByteArray()); } } @Override public void readFields(DataInput in) throws IOException { - this.bundleSize = in.readInt(); + bundleSize = in.readInt(); - if (this.bundleSize > 0) { + if (bundleSize > 0) { className = in.readUTF(); int bytesLength = in.readInt(); byte[] temp = new byte[bytesLength]; in.readFully(temp); - bufferDos.write(temp); + outputStream.write(temp); } } Modified: hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java?rev=1666061&r1=1666060&r2=1666061&view=diff ============================================================================== --- hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java (original) +++ hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java Thu Mar 12 01:23:15 2015 @@ -19,32 +19,39 @@ package org.apache.hama.util; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; import junit.framework.TestCase; import org.apache.hadoop.io.DoubleWritable; -public class TestKryoSerializer extends TestCase { +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; - ByteArrayOutputStream mbos = null; - DataOutputStream mdos = null; - ByteArrayInputStream mbis = null; - DataInputStream mdis = null; +public class TestKryoSerializer extends TestCase { public void testSerialization() throws Exception { - KryoSerializer k = new KryoSerializer(DoubleWritable.class); + Kryo kryo = new Kryo(); + kryo.register(DoubleWritable.class); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + Output out = new Output(outputStream, 4096); + + for (int i = 0; i < 10; i++) { + DoubleWritable a = new DoubleWritable(i + 0.123); + kryo.writeClassAndObject(out, a); + out.flush(); + } + + System.out.println(outputStream.size()); - long startTime = System.currentTimeMillis(); - for (int i = 0; i < 10000000; i++) { - DoubleWritable x = new DoubleWritable(i + 0.2); - byte[] bytes = k.serialize(x); - DoubleWritable y = (DoubleWritable) k.deserialize(bytes); - assertEquals(x, y); + ByteArrayInputStream bin = new ByteArrayInputStream(outputStream.toByteArray()); + Input in = new Input(bin, 4096); + + for (int i = 0; i < 10; i++) { + DoubleWritable b = (DoubleWritable) kryo.readClassAndObject(in); + System.out.println(bin.available() + ", " + b); } - System.out.println("Finished in " - + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); } } Modified: hama/trunk/graph/pom.xml URL: http://svn.apache.org/viewvc/hama/trunk/graph/pom.xml?rev=1666061&r1=1666060&r2=1666061&view=diff ============================================================================== --- hama/trunk/graph/pom.xml (original) +++ hama/trunk/graph/pom.xml Thu Mar 12 01:23:15 2015 @@ -48,6 +48,16 @@ test-jar test + + org.apache.directmemory + directmemory-cache + 0.2 + + + org.apache.directmemory + directmemory-kryo + 0.2 + hama-graph-${project.version} Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1666061&r1=1666060&r2=1666061&view=diff ============================================================================== --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original) +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Thu Mar 12 01:23:15 2015 @@ -17,11 +17,14 @@ */ package org.apache.hama.graph; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import java.util.Iterator; import java.util.List; import org.apache.hadoop.io.DataInputBuffer; @@ -54,8 +57,7 @@ public final class GraphJobMessage imple private int numOfValues = 0; - private final ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream(); - private final DataOutputStream bufferDos = new DataOutputStream(byteBuffer); + private final ByteArrayOutputStream bytesStream = new ByteArrayOutputStream(); static { if (comparator == null) { @@ -92,7 +94,7 @@ public final class GraphJobMessage imple this.flag = VERTEX_FLAG; this.vertexId = vertexID; try { - this.bufferDos.write(valuesBytes); + this.bytesStream.write(valuesBytes); } catch (IOException e) { e.printStackTrace(); } @@ -109,25 +111,23 @@ public final class GraphJobMessage imple } public byte[] getValuesBytes() { - return byteBuffer.toByteArray(); + return bytesStream.toByteArray(); } public void addValuesBytes(byte[] values, int numOfValues) { try { - bufferDos.write(values); + bytesStream.write(values); this.numOfValues += numOfValues; } catch (IOException e) { - // TODO Auto-generated catch block e.printStackTrace(); } } public void add(Writable value) { try { - value.write(bufferDos); + value.write(new DataOutputStream(bytesStream)); numOfValues++; } catch (IOException e) { - // TODO Auto-generated catch block e.printStackTrace(); } } @@ -155,8 +155,8 @@ public final class GraphJobMessage imple vertexId.write(out); out.writeInt(numOfValues); - out.writeInt(byteBuffer.size()); - out.write(byteBuffer.toByteArray()); + out.writeInt(bytesStream.size()); + out.write(bytesStream.toByteArray()); } else if (isMapMessage()) { map.write(out); } else if (isVerticesSizeMessage()) { @@ -199,7 +199,7 @@ public final class GraphJobMessage imple int bytesLength = in.readInt(); byte[] temp = new byte[bytesLength]; in.readFully(temp); - bufferDos.write(temp); + bytesStream.write(temp); } else if (isMapMessage()) { map = new MapWritable(); map.readFields(in); @@ -304,4 +304,41 @@ public final class GraphJobMessage imple } } + + public Iterable getIterableMessages() { + + return new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + ByteArrayInputStream bis = new ByteArrayInputStream(bytesStream.toByteArray()); + DataInputStream dis = new DataInputStream(bis); + int index = 0; + + @Override + public boolean hasNext() { + return (index < numOfValues) ? true : false; + } + + @Override + public Writable next() { + Writable v = GraphJobRunner.createVertexValue(); + try { + v.readFields(dis); + } catch (IOException e) { + e.printStackTrace(); + } + index++; + return v; + } + + @Override + public void remove() { + } + }; + } + }; + } + + } Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1666061&r1=1666060&r2=1666061&view=diff ============================================================================== --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original) +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Mar 12 01:23:15 2015 @@ -17,8 +17,6 @@ */ package org.apache.hama.graph; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; import java.io.IOException; import java.util.Collections; import java.util.HashSet; @@ -231,23 +229,18 @@ public final class GraphJobRunner msgs = null; Vertex vertex = null; while (currentMessage != null) { vertex = vertices.get((V) currentMessage.getVertexId()); - final int numOfValues = currentMessage.getNumOfValues(); - final byte[] serializedMsgs = currentMessage.getValuesBytes(); - msgs = getIterableMessages(numOfValues, serializedMsgs); - // reactivation if (vertex.isHalted()) { vertex.setActive(); } if (!vertex.isHalted()) { - vertex.compute((Iterable) msgs); + vertex.compute((Iterable) currentMessage.getIterableMessages()); vertices.finishVertexComputation(vertex); activeVertices++; @@ -650,42 +643,6 @@ public final class GraphJobRunner getIterableMessages(final int numOfValues, - final byte[] msgBytes) { - - return new Iterable() { - @Override - public Iterator iterator() { - return new Iterator() { - ByteArrayInputStream bis = new ByteArrayInputStream(msgBytes); - DataInputStream dis = new DataInputStream(bis); - int index = 0; - - @Override - public boolean hasNext() { - return (index < numOfValues) ? true : false; - } - - @Override - public Writable next() { - Writable v = GraphJobRunner.createVertexValue(); - try { - v.readFields(dis); - } catch (IOException e) { - e.printStackTrace(); - } - index++; - return v; - } - - @Override - public void remove() { - } - }; - } - }; - } - public int getChangedVertexCnt() { return changedVertexCnt; } Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1666061&r1=1666060&r2=1666061&view=diff ============================================================================== --- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java (original) +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java Thu Mar 12 01:23:15 2015 @@ -39,7 +39,6 @@ public class OutgoingVertexMessageManage .getLog(OutgoingVertexMessageManager.class); private Combiner combiner; - private Iterable msgs; private HashMap storage = new HashMap(); @SuppressWarnings("unchecked") @@ -50,7 +49,8 @@ public class OutgoingVertexMessageManage final String combinerName = conf.get(Constants.COMBINER_CLASS); if (combinerName != null) { try { - combiner = (Combiner) ReflectionUtils.newInstance(combinerName); + combiner = (Combiner) ReflectionUtils + .newInstance(combinerName); } catch (ClassNotFoundException e) { e.printStackTrace(); } @@ -73,13 +73,11 @@ public class OutgoingVertexMessageManage // Combining messages if (combiner != null && msgPerVertex.get(vertexID).getNumOfValues() > 1) { - final int numOfValues = msgPerVertex.get(vertexID).getNumOfValues(); - final byte[] msgBytes = msgPerVertex.get(vertexID).getValuesBytes(); - msgs = GraphJobRunner.getIterableMessages(numOfValues, msgBytes); - // Overwrite - storage.get(targetPeerAddress).put(vertexID, - new GraphJobMessage(vertexID, combiner.combine(msgs))); + storage.get(targetPeerAddress).put( + vertexID, + new GraphJobMessage(vertexID, combiner.combine(msgPerVertex.get( + vertexID).getIterableMessages()))); } } else { outgoingBundles.get(targetPeerAddress).addMessage(msg); Modified: hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/TestOnlineCF.java URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/TestOnlineCF.java?rev=1666061&r1=1666060&r2=1666061&view=diff ============================================================================== --- hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/TestOnlineCF.java (original) +++ hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/TestOnlineCF.java Thu Mar 12 01:23:15 2015 @@ -31,41 +31,38 @@ import org.apache.hama.ml.recommendation import org.apache.hama.ml.recommendation.cf.function.MeanAbsError; import org.junit.Test; -public class TestOnlineCF extends TestCase{ +public class TestOnlineCF extends TestCase { @SuppressWarnings({ "deprecation", "rawtypes", "unchecked" }) @Test public void testOnlineCF() { - Preference[] train_prefs = { - new Preference(1, 1, 4), - new Preference(1, 2, 2.5), - new Preference(1, 3, 3.5), - new Preference(1, 4, 1), - new Preference(1, 5, 3.5), - new Preference(2, 1, 4), - new Preference(2, 2, 2.5), - new Preference(2, 3, 3.5), - new Preference(2, 4, 1), - new Preference(2, 5, 3.5), - new Preference(3, 1, 4), - new Preference(3, 2, 2.5), - new Preference(3, 3, 3.5)}; - Preference[] test_prefs = { - new Preference(1, 3, 3.5), - new Preference(2, 4, 1), - new Preference(3, 4, 1), - new Preference(3, 5, 3.5) - }; - + Preference[] train_prefs = { new Preference(1, 1, 4), + new Preference(1, 2, 2.5), + new Preference(1, 3, 3.5), + new Preference(1, 4, 1), + new Preference(1, 5, 3.5), + new Preference(2, 1, 4), + new Preference(2, 2, 2.5), + new Preference(2, 3, 3.5), + new Preference(2, 4, 1), + new Preference(2, 5, 3.5), + new Preference(3, 1, 4), + new Preference(3, 2, 2.5), + new Preference(3, 3, 3.5) }; + Preference[] test_prefs = { new Preference(1, 3, 3.5), + new Preference(2, 4, 1), + new Preference(3, 4, 1), + new Preference(3, 5, 3.5) }; + Random rnd = new Random(); Long num = Long.valueOf(rnd.nextInt(100000)); String fileName = "onlinecf_train" + num.toString(); String outputFileName = "onlinecf_model" + num.toString(); - + Configuration fsConf = new Configuration(); String strDataPath = "/tmp/" + fileName; String convertedFileName = "/tmp/converted_" + fileName; Path dataPath = new Path(strDataPath); - + try { URI uri = new URI(strDataPath); FileSystem fs = FileSystem.get(uri, fsConf); @@ -83,10 +80,11 @@ public class TestOnlineCF extends TestCa } fileOut.writeBytes(str.toString()); fileOut.close(); - + MovieLensConverter converter = new MovieLensConverter(); - assertEquals(true, converter.convert(strDataPath, null, convertedFileName)); - + assertEquals(true, + converter.convert(strDataPath, null, convertedFileName)); + OnlineCF recommender = new OnlineCF(); recommender.setInputPreferences(convertedFileName); recommender.setIteration(150); @@ -101,11 +99,12 @@ public class TestOnlineCF extends TestCa int correct = 0; for (Preference test : test_prefs) { double actual = test.getValue().get(); - double estimated = recommender.estimatePreference(test.getUserId(), test.getItemId()); - correct += (Math.abs(actual-estimated)<0.5)?1:0; + double estimated = recommender.estimatePreference(test.getUserId(), + test.getItemId()); + correct += (Math.abs(actual - estimated) < 0.5) ? 1 : 0; } - assertEquals(test_prefs.length*0.75, correct, 1); + assertEquals(test_prefs.length * 0.75, correct, 1); fs.delete(new Path(outputFileName)); fs.delete(new Path(strDataPath)); Modified: hama/trunk/pom.xml URL: http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1666061&r1=1666060&r2=1666061&view=diff ============================================================================== --- hama/trunk/pom.xml (original) +++ hama/trunk/pom.xml Thu Mar 12 01:23:15 2015 @@ -102,7 +102,7 @@ 1.2.16 3.4.5 1.7.1 - 2.20 + 3.0.0 @@ -224,7 +224,7 @@ 1.0.5 - com.esotericsoftware.kryo + com.esotericsoftware kryo ${kryo.version}