Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DA49F9118 for ; Sat, 10 Mar 2012 01:54:16 +0000 (UTC) Received: (qmail 25376 invoked by uid 500); 10 Mar 2012 01:54:16 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 25294 invoked by uid 500); 10 Mar 2012 01:54:16 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 25286 invoked by uid 99); 10 Mar 2012 01:54:16 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Mar 2012 01:54:16 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Mar 2012 01:54:14 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6C6D12388865 for ; Sat, 10 Mar 2012 01:53:54 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1299141 - in /hadoop/common/branches/branch-1.0: ./ src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/ src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/ Date: Sat, 10 Mar 2012 01:53:54 -0000 To: common-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120310015354.6C6D12388865@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: acmurthy Date: Sat Mar 10 01:53:53 2012 New Revision: 1299141 URL: http://svn.apache.org/viewvc?rev=1299141&view=rev Log: Merge -c 1299140 from branch-1 to branch-1.0 to fix HADOOP-5450. Add support for application-specific typecodes to typed bytes. Contributed by Klaas Bosteels. Modified: hadoop/common/branches/branch-1.0/CHANGES.txt hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java Modified: hadoop/common/branches/branch-1.0/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/CHANGES.txt?rev=1299141&r1=1299140&r2=1299141&view=diff ============================================================================== --- hadoop/common/branches/branch-1.0/CHANGES.txt (original) +++ hadoop/common/branches/branch-1.0/CHANGES.txt Sat Mar 10 01:53:53 2012 @@ -15,6 +15,9 @@ Release 1.0.2 - unreleased HADOOP-1722. Allow hadoop streaming to handle non-utf8 byte array. (Klaas Bosteels and Matthias Lehmann via acmurthy) + HADOOP-5450. Add support for application-specific typecodes to typed + bytes. (Klaas Bosteels via acmurthy) + BUG FIXES HADOOP-8050. Deadlock in metrics. (Kihwal Lee via mattf) Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java?rev=1299141&r1=1299140&r2=1299141&view=diff ============================================================================== --- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java (original) +++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java Sat Mar 10 01:53:53 2012 @@ -23,6 +23,7 @@ package org.apache.hadoop.typedbytes; */ public enum Type { + // codes for supported types (< 50): BYTES(0), BYTE(1), BOOL(2), @@ -34,6 +35,11 @@ public enum Type { VECTOR(8), LIST(9), MAP(10), + + // application-specific codes (50-200): + WRITABLE(50), + + // low-level codes (> 200): MARKER(255); final int code; Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java?rev=1299141&r1=1299140&r2=1299141&view=diff ============================================================================== --- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java (original) +++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java Sat Mar 10 01:53:53 2012 @@ -101,6 +101,8 @@ public class TypedBytesInput { return readMap(); } else if (code == Type.MARKER.code) { return null; + } else if (50 <= code && code <= 200) { // application-specific typecodes + return new Buffer(readBytes()); } else { throw new RuntimeException("unknown type"); } @@ -146,6 +148,8 @@ public class TypedBytesInput { return readRawMap(); } else if (code == Type.MARKER.code) { return null; + } else if (50 <= code && code <= 200) { // application-specific typecodes + return readRawBytes(); } else { throw new RuntimeException("unknown type"); } Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java?rev=1299141&r1=1299140&r2=1299141&view=diff ============================================================================== --- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java (original) +++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java Sat Mar 10 01:53:53 2012 @@ -124,16 +124,27 @@ public class TypedBytesOutput { } /** - * Writes a bytes array as a typed bytes sequence. + * Writes a bytes array as a typed bytes sequence, using a given typecode. * * @param bytes the bytes array to be written + * @param code the typecode to use * @throws IOException */ - public void writeBytes(byte[] bytes) throws IOException { - out.write(Type.BYTES.code); + public void writeBytes(byte[] bytes, int code) throws IOException { + out.write(code); out.writeInt(bytes.length); out.write(bytes); } + + /** + * Writes a bytes array as a typed bytes sequence. + * + * @param bytes the bytes array to be written + * @throws IOException + */ + public void writeBytes(byte[] bytes) throws IOException { + writeBytes(bytes, Type.BYTES.code); + } /** * Writes a byte as a typed bytes sequence. Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java?rev=1299141&r1=1299140&r2=1299141&view=diff ============================================================================== --- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java (original) +++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java Sat Mar 10 01:53:53 2012 @@ -18,9 +18,13 @@ package org.apache.hadoop.typedbytes; +import java.io.ByteArrayInputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.IOException; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.ByteWritable; @@ -36,17 +40,22 @@ import org.apache.hadoop.io.VIntWritable import org.apache.hadoop.io.VLongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.util.ReflectionUtils; /** * Provides functionality for reading typed bytes as Writable objects. * * @see TypedBytesInput */ -public class TypedBytesWritableInput { +public class TypedBytesWritableInput implements Configurable { private TypedBytesInput in; + private Configuration conf; - private TypedBytesWritableInput() {} + private TypedBytesWritableInput() { + conf = new Configuration(); + } private void setTypedBytesInput(TypedBytesInput in) { this.in = in; @@ -86,6 +95,7 @@ public class TypedBytesWritableInput { /** Creates a new instance of TypedBytesWritableInput. */ public TypedBytesWritableInput(TypedBytesInput in) { + this(); this.in = in; } @@ -120,6 +130,8 @@ public class TypedBytesWritableInput { return readArray(); case MAP: return readMap(); + case WRITABLE: + return readWritable(); default: throw new RuntimeException("unknown type"); } @@ -151,6 +163,8 @@ public class TypedBytesWritableInput { return ArrayWritable.class; case MAP: return MapWritable.class; + case WRITABLE: + return Writable.class; default: throw new RuntimeException("unknown type"); } @@ -331,5 +345,36 @@ public class TypedBytesWritableInput { public SortedMapWritable readSortedMap() throws IOException { return readSortedMap(null); } + + public Writable readWritable(Writable writable) throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(in.readBytes()); + DataInputStream dis = new DataInputStream(bais); + String className = WritableUtils.readString(dis); + if (writable == null) { + try { + Class cls = + conf.getClassByName(className).asSubclass(Writable.class); + writable = (Writable) ReflectionUtils.newInstance(cls, conf); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + } else if (!writable.getClass().getName().equals(className)) { + throw new IOException("wrong Writable class given"); + } + writable.readFields(dis); + return writable; + } + + public Writable readWritable() throws IOException { + return readWritable(null); + } + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + } Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java?rev=1299141&r1=1299140&r2=1299141&view=diff ============================================================================== --- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java (original) +++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java Sat Mar 10 01:53:53 2012 @@ -40,6 +40,7 @@ import org.apache.hadoop.io.VIntWritable import org.apache.hadoop.io.VLongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.record.Record; /** @@ -91,6 +92,7 @@ public class TypedBytesWritableOutput { /** Creates a new instance of TypedBytesWritableOutput. */ public TypedBytesWritableOutput(TypedBytesOutput out) { + this(); this.out = out; } @@ -209,13 +211,12 @@ public class TypedBytesWritableOutput { } public void writeWritable(Writable w) throws IOException { - out.writeVectorHeader(2); - out.writeString(w.getClass().getName()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); + WritableUtils.writeString(dos, w.getClass().getName()); w.write(dos); dos.close(); - out.writeBytes(baos.toByteArray()); + out.writeBytes(baos.toByteArray(), Type.WRITABLE.code); } } Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html?rev=1299141&r1=1299140&r2=1299141&view=diff ============================================================================== --- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html (original) +++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html Sat Mar 10 01:53:53 2012 @@ -41,6 +41,8 @@ Each typed bytes sequence starts with an 10A map.

+The type codes 50 to 200 are treated as aliases for 0, and can thus be used for +application-specific serialization.

Subsequent Bytes

Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java?rev=1299141&r1=1299140&r2=1299141&view=diff ============================================================================== --- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java (original) +++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java Sat Mar 10 01:53:53 2012 @@ -40,6 +40,7 @@ import org.apache.hadoop.io.BytesWritabl import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.VIntWritable; import org.apache.hadoop.io.VLongWritable; @@ -80,6 +81,7 @@ public class TestIO extends TestCase { (byte) 123, true, 12345, 123456789L, (float) 1.2, 1.234, "random string", vector, list, map }; + byte[] appSpecificBytes = new byte[] { 1, 2, 3 }; FileOutputStream ostream = new FileOutputStream(tmpfile); DataOutputStream dostream = new DataOutputStream(ostream); @@ -87,6 +89,7 @@ public class TestIO extends TestCase { for (Object obj : objects) { out.write(obj); } + out.writeBytes(appSpecificBytes, 100); dostream.close(); ostream.close(); @@ -96,6 +99,7 @@ public class TestIO extends TestCase { for (Object obj : objects) { assertEquals(obj, in.read()); } + assertEquals(new Buffer(appSpecificBytes), in.read()); distream.close(); istream.close(); @@ -114,6 +118,9 @@ public class TestIO extends TestCase { dis = new DataInputStream(bais); assertEquals(obj, (new TypedBytesInput(dis)).read()); } + byte[] rawBytes = in.readRaw(); + assertEquals(new Buffer(appSpecificBytes), + new Buffer(rawBytes, 5, rawBytes.length - 5)); distream.close(); istream.close(); } @@ -164,7 +171,8 @@ public class TestIO extends TestCase { new ByteWritable((byte) 123), new BooleanWritable(true), new VIntWritable(12345), new VLongWritable(123456789L), new FloatWritable((float) 1.2), new DoubleWritable(1.234), - new Text("random string") + new Text("random string"), + new ObjectWritable("test") }; TypedBytesWritable tbw = new TypedBytesWritable(); tbw.setValue("typed bytes text"); @@ -201,7 +209,7 @@ public class TestIO extends TestCase { TypedBytesWritableInput in = new TypedBytesWritableInput(distream); for (Writable w : writables) { - assertEquals(w, in.read()); + assertEquals(w.toString(), in.read().toString()); } assertEquals(tbw.getValue().toString(), in.read().toString());