Return-Path:
X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org
Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by minotaur.apache.org (Postfix) with SMTP id DA49F9118
for ;
Sat, 10 Mar 2012 01:54:16 +0000 (UTC)
Received: (qmail 25376 invoked by uid 500); 10 Mar 2012 01:54:16 -0000
Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org
Received: (qmail 25294 invoked by uid 500); 10 Mar 2012 01:54:16 -0000
Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: common-dev@hadoop.apache.org
Delivered-To: mailing list common-commits@hadoop.apache.org
Received: (qmail 25286 invoked by uid 99); 10 Mar 2012 01:54:16 -0000
Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136)
by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Mar 2012 01:54:16 +0000
X-ASF-Spam-Status: No, hits=-2000.0 required=5.0
tests=ALL_TRUSTED
X-Spam-Check-By: apache.org
Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4)
by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Mar 2012 01:54:14 +0000
Received: from eris.apache.org (localhost [127.0.0.1])
by eris.apache.org (Postfix) with ESMTP id 6C6D12388865
for ; Sat, 10 Mar 2012 01:53:54 +0000 (UTC)
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Subject: svn commit: r1299141 - in /hadoop/common/branches/branch-1.0: ./
src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/
src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/
Date: Sat, 10 Mar 2012 01:53:54 -0000
To: common-commits@hadoop.apache.org
From: acmurthy@apache.org
X-Mailer: svnmailer-1.0.8-patched
Message-Id: <20120310015354.6C6D12388865@eris.apache.org>
X-Virus-Checked: Checked by ClamAV on apache.org
Author: acmurthy
Date: Sat Mar 10 01:53:53 2012
New Revision: 1299141
URL: http://svn.apache.org/viewvc?rev=1299141&view=rev
Log:
Merge -c 1299140 from branch-1 to branch-1.0 to fix HADOOP-5450. Add support for application-specific typecodes to typed bytes. Contributed by Klaas Bosteels.
Modified:
hadoop/common/branches/branch-1.0/CHANGES.txt
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java
Modified: hadoop/common/branches/branch-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/CHANGES.txt?rev=1299141&r1=1299140&r2=1299141&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.0/CHANGES.txt Sat Mar 10 01:53:53 2012
@@ -15,6 +15,9 @@ Release 1.0.2 - unreleased
HADOOP-1722. Allow hadoop streaming to handle non-utf8 byte array. (Klaas
Bosteels and Matthias Lehmann via acmurthy)
+ HADOOP-5450. Add support for application-specific typecodes to typed
+ bytes. (Klaas Bosteels via acmurthy)
+
BUG FIXES
HADOOP-8050. Deadlock in metrics. (Kihwal Lee via mattf)
Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java?rev=1299141&r1=1299140&r2=1299141&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java Sat Mar 10 01:53:53 2012
@@ -23,6 +23,7 @@ package org.apache.hadoop.typedbytes;
*/
public enum Type {
+ // codes for supported types (< 50):
BYTES(0),
BYTE(1),
BOOL(2),
@@ -34,6 +35,11 @@ public enum Type {
VECTOR(8),
LIST(9),
MAP(10),
+
+ // application-specific codes (50-200):
+ WRITABLE(50),
+
+ // low-level codes (> 200):
MARKER(255);
final int code;
Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java?rev=1299141&r1=1299140&r2=1299141&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java Sat Mar 10 01:53:53 2012
@@ -101,6 +101,8 @@ public class TypedBytesInput {
return readMap();
} else if (code == Type.MARKER.code) {
return null;
+ } else if (50 <= code && code <= 200) { // application-specific typecodes
+ return new Buffer(readBytes());
} else {
throw new RuntimeException("unknown type");
}
@@ -146,6 +148,8 @@ public class TypedBytesInput {
return readRawMap();
} else if (code == Type.MARKER.code) {
return null;
+ } else if (50 <= code && code <= 200) { // application-specific typecodes
+ return readRawBytes();
} else {
throw new RuntimeException("unknown type");
}
Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java?rev=1299141&r1=1299140&r2=1299141&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java Sat Mar 10 01:53:53 2012
@@ -124,16 +124,27 @@ public class TypedBytesOutput {
}
/**
- * Writes a bytes array as a typed bytes sequence.
+ * Writes a bytes array as a typed bytes sequence, using a given typecode.
*
* @param bytes the bytes array to be written
+ * @param code the typecode to use
* @throws IOException
*/
- public void writeBytes(byte[] bytes) throws IOException {
- out.write(Type.BYTES.code);
+ public void writeBytes(byte[] bytes, int code) throws IOException {
+ out.write(code);
out.writeInt(bytes.length);
out.write(bytes);
}
+
+ /**
+ * Writes a bytes array as a typed bytes sequence.
+ *
+ * @param bytes the bytes array to be written
+ * @throws IOException
+ */
+ public void writeBytes(byte[] bytes) throws IOException {
+ writeBytes(bytes, Type.BYTES.code);
+ }
/**
* Writes a byte as a typed bytes sequence.
Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java?rev=1299141&r1=1299140&r2=1299141&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java Sat Mar 10 01:53:53 2012
@@ -18,9 +18,13 @@
package org.apache.hadoop.typedbytes;
+import java.io.ByteArrayInputStream;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.IOException;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
@@ -36,17 +40,22 @@ import org.apache.hadoop.io.VIntWritable
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* Provides functionality for reading typed bytes as Writable objects.
*
* @see TypedBytesInput
*/
-public class TypedBytesWritableInput {
+public class TypedBytesWritableInput implements Configurable {
private TypedBytesInput in;
+ private Configuration conf;
- private TypedBytesWritableInput() {}
+ private TypedBytesWritableInput() {
+ conf = new Configuration();
+ }
private void setTypedBytesInput(TypedBytesInput in) {
this.in = in;
@@ -86,6 +95,7 @@ public class TypedBytesWritableInput {
/** Creates a new instance of TypedBytesWritableInput. */
public TypedBytesWritableInput(TypedBytesInput in) {
+ this();
this.in = in;
}
@@ -120,6 +130,8 @@ public class TypedBytesWritableInput {
return readArray();
case MAP:
return readMap();
+ case WRITABLE:
+ return readWritable();
default:
throw new RuntimeException("unknown type");
}
@@ -151,6 +163,8 @@ public class TypedBytesWritableInput {
return ArrayWritable.class;
case MAP:
return MapWritable.class;
+ case WRITABLE:
+ return Writable.class;
default:
throw new RuntimeException("unknown type");
}
@@ -331,5 +345,36 @@ public class TypedBytesWritableInput {
public SortedMapWritable readSortedMap() throws IOException {
return readSortedMap(null);
}
+
+ public Writable readWritable(Writable writable) throws IOException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(in.readBytes());
+ DataInputStream dis = new DataInputStream(bais);
+ String className = WritableUtils.readString(dis);
+ if (writable == null) {
+ try {
+ Class extends Writable> cls =
+ conf.getClassByName(className).asSubclass(Writable.class);
+ writable = (Writable) ReflectionUtils.newInstance(cls, conf);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ } else if (!writable.getClass().getName().equals(className)) {
+ throw new IOException("wrong Writable class given");
+ }
+ writable.readFields(dis);
+ return writable;
+ }
+
+ public Writable readWritable() throws IOException {
+ return readWritable(null);
+ }
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
}
Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java?rev=1299141&r1=1299140&r2=1299141&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java Sat Mar 10 01:53:53 2012
@@ -40,6 +40,7 @@ import org.apache.hadoop.io.VIntWritable
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.record.Record;
/**
@@ -91,6 +92,7 @@ public class TypedBytesWritableOutput {
/** Creates a new instance of TypedBytesWritableOutput. */
public TypedBytesWritableOutput(TypedBytesOutput out) {
+ this();
this.out = out;
}
@@ -209,13 +211,12 @@ public class TypedBytesWritableOutput {
}
public void writeWritable(Writable w) throws IOException {
- out.writeVectorHeader(2);
- out.writeString(w.getClass().getName());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
+ WritableUtils.writeString(dos, w.getClass().getName());
w.write(dos);
dos.close();
- out.writeBytes(baos.toByteArray());
+ out.writeBytes(baos.toByteArray(), Type.WRITABLE.code);
}
}
Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html?rev=1299141&r1=1299140&r2=1299141&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html Sat Mar 10 01:53:53 2012
@@ -41,6 +41,8 @@ Each typed bytes sequence starts with an
10 | A map. |
+The type codes 50 to 200 are treated as aliases for 0, and can thus be used for
+application-specific serialization.
Subsequent Bytes
Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java?rev=1299141&r1=1299140&r2=1299141&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java Sat Mar 10 01:53:53 2012
@@ -40,6 +40,7 @@ import org.apache.hadoop.io.BytesWritabl
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.io.VLongWritable;
@@ -80,6 +81,7 @@ public class TestIO extends TestCase {
(byte) 123, true, 12345, 123456789L, (float) 1.2, 1.234,
"random string", vector, list, map
};
+ byte[] appSpecificBytes = new byte[] { 1, 2, 3 };
FileOutputStream ostream = new FileOutputStream(tmpfile);
DataOutputStream dostream = new DataOutputStream(ostream);
@@ -87,6 +89,7 @@ public class TestIO extends TestCase {
for (Object obj : objects) {
out.write(obj);
}
+ out.writeBytes(appSpecificBytes, 100);
dostream.close();
ostream.close();
@@ -96,6 +99,7 @@ public class TestIO extends TestCase {
for (Object obj : objects) {
assertEquals(obj, in.read());
}
+ assertEquals(new Buffer(appSpecificBytes), in.read());
distream.close();
istream.close();
@@ -114,6 +118,9 @@ public class TestIO extends TestCase {
dis = new DataInputStream(bais);
assertEquals(obj, (new TypedBytesInput(dis)).read());
}
+ byte[] rawBytes = in.readRaw();
+ assertEquals(new Buffer(appSpecificBytes),
+ new Buffer(rawBytes, 5, rawBytes.length - 5));
distream.close();
istream.close();
}
@@ -164,7 +171,8 @@ public class TestIO extends TestCase {
new ByteWritable((byte) 123), new BooleanWritable(true),
new VIntWritable(12345), new VLongWritable(123456789L),
new FloatWritable((float) 1.2), new DoubleWritable(1.234),
- new Text("random string")
+ new Text("random string"),
+ new ObjectWritable("test")
};
TypedBytesWritable tbw = new TypedBytesWritable();
tbw.setValue("typed bytes text");
@@ -201,7 +209,7 @@ public class TestIO extends TestCase {
TypedBytesWritableInput in = new TypedBytesWritableInput(distream);
for (Writable w : writables) {
- assertEquals(w, in.read());
+ assertEquals(w.toString(), in.read().toString());
}
assertEquals(tbw.getValue().toString(), in.read().toString());