Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 38118 invoked from network); 13 Feb 2009 04:12:47 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 13 Feb 2009 04:12:47 -0000 Received: (qmail 44063 invoked by uid 500); 13 Feb 2009 04:12:47 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 44026 invoked by uid 500); 13 Feb 2009 04:12:46 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 44017 invoked by uid 99); 13 Feb 2009 04:12:46 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Feb 2009 20:12:46 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Feb 2009 04:12:34 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 741122388A9A; Fri, 13 Feb 2009 04:12:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r743975 [2/2] - in /hadoop/core/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/ src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/ src/contrib/st... Date: Fri, 13 Feb 2009 04:12:12 -0000 To: core-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090213041214.741122388A9A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.typedbytes; + +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; + +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.record.Buffer; + +/** + * Provides functionality for writing typed bytes. + */ +public class TypedBytesOutput { + + private DataOutput out; + + private TypedBytesOutput() {} + + private void setDataOutput(DataOutput out) { + this.out = out; + } + + private static ThreadLocal tbOut = new ThreadLocal() { + protected synchronized Object initialValue() { + return new TypedBytesOutput(); + } + }; + + /** + * Get a thread-local typed bytes output for the supplied {@link DataOutput}. + * + * @param out data output object + * @return typed bytes output corresponding to the supplied + * {@link DataOutput}. + */ + public static TypedBytesOutput get(DataOutput out) { + TypedBytesOutput bout = (TypedBytesOutput) tbOut.get(); + bout.setDataOutput(out); + return bout; + } + + /** Creates a new instance of TypedBytesOutput. */ + public TypedBytesOutput(DataOutput out) { + this.out = out; + } + + /** + * Writes a Java object as a typed bytes sequence. + * + * @param obj the object to be written + * @throws IOException + */ + public void write(Object obj) throws IOException { + if (obj instanceof Buffer) { + writeBytes(((Buffer) obj).get()); + } else if (obj instanceof Byte) { + writeByte((Byte) obj); + } else if (obj instanceof Boolean) { + writeBool((Boolean) obj); + } else if (obj instanceof Integer) { + writeInt((Integer) obj); + } else if (obj instanceof Long) { + writeLong((Long) obj); + } else if (obj instanceof Float) { + writeFloat((Float) obj); + } else if (obj instanceof Double) { + writeDouble((Double) obj); + } else if (obj instanceof String) { + writeString((String) obj); + } else if (obj instanceof ArrayList) { + writeVector((ArrayList) obj); + } else if (obj instanceof List) { + writeList((List) obj); + } else if (obj instanceof Map) { + writeMap((Map) obj); + } else { + throw new RuntimeException("cannot write objects of this type"); + } + } + + /** + * Writes a raw sequence of typed bytes. + * + * @param bytes the bytes to be written + * @throws IOException + */ + public void writeRaw(byte[] bytes) throws IOException { + out.write(bytes); + } + + /** + * Writes a raw sequence of typed bytes. + * + * @param bytes the bytes to be written + * @param offset an offset in the given array + * @param length number of bytes from the given array to write + * @throws IOException + */ + public void writeRaw(byte[] bytes, int offset, int length) + throws IOException { + out.write(bytes, offset, length); + } + + /** + * Writes a bytes array as a typed bytes sequence. + * + * @param bytes the bytes array to be written + * @throws IOException + */ + public void writeBytes(byte[] bytes) throws IOException { + out.write(Type.BYTES.code); + out.writeInt(bytes.length); + out.write(bytes); + } + + /** + * Writes a byte as a typed bytes sequence. + * + * @param b the byte to be written + * @throws IOException + */ + public void writeByte(byte b) throws IOException { + out.write(Type.BYTE.code); + out.write(b); + } + + /** + * Writes a boolean as a typed bytes sequence. + * + * @param b the boolean to be written + * @throws IOException + */ + public void writeBool(boolean b) throws IOException { + out.write(Type.BOOL.code); + out.writeBoolean(b); + } + + /** + * Writes an integer as a typed bytes sequence. + * + * @param i the integer to be written + * @throws IOException + */ + public void writeInt(int i) throws IOException { + out.write(Type.INT.code); + out.writeInt(i); + } + + /** + * Writes a long as a typed bytes sequence. + * + * @param l the long to be written + * @throws IOException + */ + public void writeLong(long l) throws IOException { + out.write(Type.LONG.code); + out.writeLong(l); + } + + /** + * Writes a float as a typed bytes sequence. + * + * @param f the float to be written + * @throws IOException + */ + public void writeFloat(float f) throws IOException { + out.write(Type.FLOAT.code); + out.writeFloat(f); + } + + /** + * Writes a double as a typed bytes sequence. + * + * @param d the double to be written + * @throws IOException + */ + public void writeDouble(double d) throws IOException { + out.write(Type.DOUBLE.code); + out.writeDouble(d); + } + + /** + * Writes a string as a typed bytes sequence. + * + * @param s the string to be written + * @throws IOException + */ + public void writeString(String s) throws IOException { + out.write(Type.STRING.code); + WritableUtils.writeString(out, s); + } + + /** + * Writes a vector as a typed bytes sequence. + * + * @param vector the vector to be written + * @throws IOException + */ + public void writeVector(ArrayList vector) throws IOException { + writeVectorHeader(vector.size()); + for (Object obj : vector) { + write(obj); + } + } + + /** + * Writes a vector header. + * + * @param length the number of elements in the vector + * @throws IOException + */ + public void writeVectorHeader(int length) throws IOException { + out.write(Type.VECTOR.code); + out.writeInt(length); + } + + /** + * Writes a list as a typed bytes sequence. + * + * @param list the list to be written + * @throws IOException + */ + public void writeList(List list) throws IOException { + writeListHeader(); + for (Object obj : list) { + write(obj); + } + writeListFooter(); + } + + /** + * Writes a list header. + * + * @throws IOException + */ + public void writeListHeader() throws IOException { + out.write(Type.LIST.code); + } + + /** + * Writes a list footer. + * + * @throws IOException + */ + public void writeListFooter() throws IOException { + out.write(Type.MARKER.code); + } + + /** + * Writes a map as a typed bytes sequence. + * + * @param map the map to be written + * @throws IOException + */ + @SuppressWarnings("unchecked") + public void writeMap(Map map) throws IOException { + writeMapHeader(map.size()); + Set entries = map.entrySet(); + for (Entry entry : entries) { + write(entry.getKey()); + write(entry.getValue()); + } + } + + /** + * Writes a map header. + * + * @param length the number of key-value pairs in the map + * @throws IOException + */ + public void writeMapHeader(int length) throws IOException { + out.write(Type.MAP.code); + out.writeInt(length); + } + +} Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordInput.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordInput.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordInput.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordInput.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.typedbytes; + +import java.io.DataInput; +import java.io.IOException; + +import org.apache.hadoop.record.Buffer; +import org.apache.hadoop.record.Index; +import org.apache.hadoop.record.RecordInput; + +/** + * Serializer for records that writes typed bytes. + */ +public class TypedBytesRecordInput implements RecordInput { + + private TypedBytesInput in; + + private TypedBytesRecordInput() {} + + private void setTypedBytesInput(TypedBytesInput in) { + this.in = in; + } + + private static ThreadLocal tbIn = new ThreadLocal() { + protected synchronized Object initialValue() { + return new TypedBytesRecordInput(); + } + }; + + /** + * Get a thread-local typed bytes record input for the supplied + * {@link TypedBytesInput}. + * + * @param in typed bytes input object + * @return typed bytes record input corresponding to the supplied + * {@link TypedBytesInput}. + */ + public static TypedBytesRecordInput get(TypedBytesInput in) { + TypedBytesRecordInput bin = (TypedBytesRecordInput) tbIn.get(); + bin.setTypedBytesInput(in); + return bin; + } + + /** + * Get a thread-local typed bytes record input for the supplied + * {@link DataInput}. + * + * @param in data input object + * @return typed bytes record input corresponding to the supplied + * {@link DataInput}. + */ + public static TypedBytesRecordInput get(DataInput in) { + return get(TypedBytesInput.get(in)); + } + + /** Creates a new instance of TypedBytesRecordInput. */ + public TypedBytesRecordInput(TypedBytesInput in) { + this.in = in; + } + + /** Creates a new instance of TypedBytesRecordInput. */ + public TypedBytesRecordInput(DataInput in) { + this(new TypedBytesInput(in)); + } + + public boolean readBool(String tag) throws IOException { + in.skipType(); + return in.readBool(); + } + + public Buffer readBuffer(String tag) throws IOException { + in.skipType(); + return new Buffer(in.readBytes()); + } + + public byte readByte(String tag) throws IOException { + in.skipType(); + return in.readByte(); + } + + public double readDouble(String tag) throws IOException { + in.skipType(); + return in.readDouble(); + } + + public float readFloat(String tag) throws IOException { + in.skipType(); + return in.readFloat(); + } + + public int readInt(String tag) throws IOException { + in.skipType(); + return in.readInt(); + } + + public long readLong(String tag) throws IOException { + in.skipType(); + return in.readLong(); + } + + public String readString(String tag) throws IOException { + in.skipType(); + return in.readString(); + } + + public void startRecord(String tag) throws IOException { + in.skipType(); + } + + public Index startVector(String tag) throws IOException { + in.skipType(); + return new TypedBytesIndex(in.readVectorHeader()); + } + + public Index startMap(String tag) throws IOException { + in.skipType(); + return new TypedBytesIndex(in.readMapHeader()); + } + + public void endRecord(String tag) throws IOException {} + + public void endVector(String tag) throws IOException {} + + public void endMap(String tag) throws IOException {} + + private static final class TypedBytesIndex implements Index { + private int nelems; + + private TypedBytesIndex(int nelems) { + this.nelems = nelems; + } + + public boolean done() { + return (nelems <= 0); + } + + public void incr() { + nelems--; + } + } + +} Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordOutput.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordOutput.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordOutput.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordOutput.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.typedbytes; + +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.TreeMap; + +import org.apache.hadoop.record.Buffer; +import org.apache.hadoop.record.Record; +import org.apache.hadoop.record.RecordOutput; + +/** + * Deserialized for records that reads typed bytes. + */ +public class TypedBytesRecordOutput implements RecordOutput { + + private TypedBytesOutput out; + + private TypedBytesRecordOutput() {} + + private void setTypedBytesOutput(TypedBytesOutput out) { + this.out = out; + } + + private static ThreadLocal tbOut = new ThreadLocal() { + protected synchronized Object initialValue() { + return new TypedBytesRecordOutput(); + } + }; + + /** + * Get a thread-local typed bytes record input for the supplied + * {@link TypedBytesOutput}. + * + * @param out typed bytes output object + * @return typed bytes record output corresponding to the supplied + * {@link TypedBytesOutput}. + */ + public static TypedBytesRecordOutput get(TypedBytesOutput out) { + TypedBytesRecordOutput bout = (TypedBytesRecordOutput) tbOut.get(); + bout.setTypedBytesOutput(out); + return bout; + } + + /** + * Get a thread-local typed bytes record output for the supplied + * {@link DataOutput}. + * + * @param out data output object + * @return typed bytes record output corresponding to the supplied + * {@link DataOutput}. + */ + public static TypedBytesRecordOutput get(DataOutput out) { + return get(TypedBytesOutput.get(out)); + } + + /** Creates a new instance of TypedBytesRecordOutput. */ + public TypedBytesRecordOutput(TypedBytesOutput out) { + this.out = out; + } + + /** Creates a new instance of TypedBytesRecordOutput. */ + public TypedBytesRecordOutput(DataOutput out) { + this(new TypedBytesOutput(out)); + } + + public void writeBool(boolean b, String tag) throws IOException { + out.writeBool(b); + } + + public void writeBuffer(Buffer buf, String tag) throws IOException { + out.writeBytes(buf.get()); + } + + public void writeByte(byte b, String tag) throws IOException { + out.writeByte(b); + } + + public void writeDouble(double d, String tag) throws IOException { + out.writeDouble(d); + } + + public void writeFloat(float f, String tag) throws IOException { + out.writeFloat(f); + } + + public void writeInt(int i, String tag) throws IOException { + out.writeInt(i); + } + + public void writeLong(long l, String tag) throws IOException { + out.writeLong(l); + } + + public void writeString(String s, String tag) throws IOException { + out.writeString(s); + } + + public void startRecord(Record r, String tag) throws IOException { + out.writeListHeader(); + } + + public void startVector(ArrayList v, String tag) throws IOException { + out.writeVectorHeader(v.size()); + } + + public void startMap(TreeMap m, String tag) throws IOException { + out.writeMapHeader(m.size()); + } + + public void endRecord(Record r, String tag) throws IOException { + out.writeListFooter(); + } + + public void endVector(ArrayList v, String tag) throws IOException {} + + public void endMap(TreeMap m, String tag) throws IOException {} + +} Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritable.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritable.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritable.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritable.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.typedbytes; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.io.BytesWritable; + +/** + * Writable for typed bytes. + */ +public class TypedBytesWritable extends BytesWritable { + + /** Create a TypedBytesWritable. */ + public TypedBytesWritable() { + super(); + } + + /** Create a TypedBytesWritable with a given byte array as initial value. */ + public TypedBytesWritable(byte[] bytes) { + super(bytes); + } + + /** Set the typed bytes from a given Java object. */ + public void setValue(Object obj) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + TypedBytesOutput tbo = TypedBytesOutput.get(new DataOutputStream(baos)); + tbo.write(obj); + byte[] bytes = baos.toByteArray(); + set(bytes, 0, bytes.length); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** Get the typed bytes as a Java object. */ + public Object getValue() { + try { + ByteArrayInputStream bais = new ByteArrayInputStream(getBytes()); + TypedBytesInput tbi = TypedBytesInput.get(new DataInputStream(bais)); + Object obj = tbi.read(); + return obj; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** Get the type code embedded in the first byte. */ + public Type getType() { + byte[] bytes = getBytes(); + if (bytes == null || bytes.length == 0) { + return null; + } + for (Type type : Type.values()) { + if (type.code == (int) bytes[0]) { + return type; + } + } + return null; + } + + /** Generate a suitable string representation. */ + public String toString() { + return getValue().toString(); + } + +} Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.typedbytes; + +import java.io.DataInput; +import java.io.IOException; + +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.SortedMapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.VIntWritable; +import org.apache.hadoop.io.VLongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Provides functionality for reading typed bytes as Writable objects. + * + * @see TypedBytesInput + */ +public class TypedBytesWritableInput { + + private TypedBytesInput in; + + private TypedBytesWritableInput() {} + + private void setTypedBytesInput(TypedBytesInput in) { + this.in = in; + } + + private static ThreadLocal tbIn = new ThreadLocal() { + protected synchronized Object initialValue() { + return new TypedBytesWritableInput(); + } + }; + + /** + * Get a thread-local typed bytes writable input for the supplied + * {@link TypedBytesInput}. + * + * @param in typed bytes input object + * @return typed bytes writable input corresponding to the supplied + * {@link TypedBytesInput}. + */ + public static TypedBytesWritableInput get(TypedBytesInput in) { + TypedBytesWritableInput bin = (TypedBytesWritableInput) tbIn.get(); + bin.setTypedBytesInput(in); + return bin; + } + + /** + * Get a thread-local typed bytes writable input for the supplied + * {@link DataInput}. + * + * @param in data input object + * @return typed bytes writable input corresponding to the supplied + * {@link DataInput}. + */ + public static TypedBytesWritableInput get(DataInput in) { + return get(TypedBytesInput.get(in)); + } + + /** Creates a new instance of TypedBytesWritableInput. */ + public TypedBytesWritableInput(TypedBytesInput in) { + this.in = in; + } + + /** Creates a new instance of TypedBytesWritableInput. */ + public TypedBytesWritableInput(DataInput din) { + this(new TypedBytesInput(din)); + } + + public Writable read() throws IOException { + Type type = in.readType(); + if (type == null) { + return null; + } + switch (type) { + case BYTES: + return readBytes(); + case BYTE: + return readByte(); + case BOOL: + return readBoolean(); + case INT: + return readVInt(); + case LONG: + return readVLong(); + case FLOAT: + return readFloat(); + case DOUBLE: + return readDouble(); + case STRING: + return readText(); + case VECTOR: + return readArray(); + case MAP: + return readMap(); + default: + throw new RuntimeException("unknown type"); + } + } + + public Class readType() throws IOException { + Type type = in.readType(); + if (type == null) { + return null; + } + switch (type) { + case BYTES: + return BytesWritable.class; + case BYTE: + return ByteWritable.class; + case BOOL: + return BooleanWritable.class; + case INT: + return VIntWritable.class; + case LONG: + return VLongWritable.class; + case FLOAT: + return FloatWritable.class; + case DOUBLE: + return DoubleWritable.class; + case STRING: + return Text.class; + case VECTOR: + return ArrayWritable.class; + case MAP: + return MapWritable.class; + default: + throw new RuntimeException("unknown type"); + } + } + + public BytesWritable readBytes(BytesWritable bw) throws IOException { + byte[] bytes = in.readBytes(); + if (bw == null) { + bw = new BytesWritable(bytes); + } else { + bw.set(bytes, 0, bytes.length); + } + return bw; + } + + public BytesWritable readBytes() throws IOException { + return readBytes(null); + } + + public ByteWritable readByte(ByteWritable bw) throws IOException { + if (bw == null) { + bw = new ByteWritable(); + } + bw.set(in.readByte()); + return bw; + } + + public ByteWritable readByte() throws IOException { + return readByte(null); + } + + public BooleanWritable readBoolean(BooleanWritable bw) throws IOException { + if (bw == null) { + bw = new BooleanWritable(); + } + bw.set(in.readBool()); + return bw; + } + + public BooleanWritable readBoolean() throws IOException { + return readBoolean(null); + } + + public IntWritable readInt(IntWritable iw) throws IOException { + if (iw == null) { + iw = new IntWritable(); + } + iw.set(in.readInt()); + return iw; + } + + public IntWritable readInt() throws IOException { + return readInt(null); + } + + public VIntWritable readVInt(VIntWritable iw) throws IOException { + if (iw == null) { + iw = new VIntWritable(); + } + iw.set(in.readInt()); + return iw; + } + + public VIntWritable readVInt() throws IOException { + return readVInt(null); + } + + public LongWritable readLong(LongWritable lw) throws IOException { + if (lw == null) { + lw = new LongWritable(); + } + lw.set(in.readLong()); + return lw; + } + + public LongWritable readLong() throws IOException { + return readLong(null); + } + + public VLongWritable readVLong(VLongWritable lw) throws IOException { + if (lw == null) { + lw = new VLongWritable(); + } + lw.set(in.readLong()); + return lw; + } + + public VLongWritable readVLong() throws IOException { + return readVLong(null); + } + + public FloatWritable readFloat(FloatWritable fw) throws IOException { + if (fw == null) { + fw = new FloatWritable(); + } + fw.set(in.readFloat()); + return fw; + } + + public FloatWritable readFloat() throws IOException { + return readFloat(null); + } + + public DoubleWritable readDouble(DoubleWritable dw) throws IOException { + if (dw == null) { + dw = new DoubleWritable(); + } + dw.set(in.readDouble()); + return dw; + } + + public DoubleWritable readDouble() throws IOException { + return readDouble(null); + } + + public Text readText(Text t) throws IOException { + if (t == null) { + t = new Text(); + } + t.set(in.readString()); + return t; + } + + public Text readText() throws IOException { + return readText(null); + } + + public ArrayWritable readArray(ArrayWritable aw) throws IOException { + if (aw == null) { + aw = new ArrayWritable(TypedBytesWritable.class); + } else if (!aw.getValueClass().equals(TypedBytesWritable.class)) { + throw new RuntimeException("value class has to be TypedBytesWritable"); + } + int length = in.readVectorHeader(); + Writable[] writables = new Writable[length]; + for (int i = 0; i < length; i++) { + writables[i] = new TypedBytesWritable(in.readRaw()); + } + aw.set(writables); + return aw; + } + + public ArrayWritable readArray() throws IOException { + return readArray(null); + } + + public MapWritable readMap(MapWritable mw) throws IOException { + if (mw == null) { + mw = new MapWritable(); + } + int length = in.readMapHeader(); + for (int i = 0; i < length; i++) { + Writable key = read(); + Writable value = read(); + mw.put(key, value); + } + return mw; + } + + public MapWritable readMap() throws IOException { + return readMap(null); + } + + public SortedMapWritable readSortedMap(SortedMapWritable mw) + throws IOException { + if (mw == null) { + mw = new SortedMapWritable(); + } + int length = in.readMapHeader(); + for (int i = 0; i < length; i++) { + WritableComparable key = (WritableComparable) read(); + Writable value = read(); + mw.put(key, value); + } + return mw; + } + + public SortedMapWritable readSortedMap() throws IOException { + return readSortedMap(null); + } + +} Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.typedbytes; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; + +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.SortedMapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.VIntWritable; +import org.apache.hadoop.io.VLongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.record.Record; + +/** + * Provides functionality for writing Writable objects as typed bytes. + * + * @see TypedBytesOutput + */ +public class TypedBytesWritableOutput { + + private TypedBytesOutput out; + + private TypedBytesWritableOutput() {} + + private void setTypedBytesOutput(TypedBytesOutput out) { + this.out = out; + } + + private static ThreadLocal tbOut = new ThreadLocal() { + protected synchronized Object initialValue() { + return new TypedBytesWritableOutput(); + } + }; + + /** + * Get a thread-local typed bytes writable input for the supplied + * {@link TypedBytesOutput}. + * + * @param out typed bytes output object + * @return typed bytes writable output corresponding to the supplied + * {@link TypedBytesOutput}. + */ + public static TypedBytesWritableOutput get(TypedBytesOutput out) { + TypedBytesWritableOutput bout = (TypedBytesWritableOutput) tbOut.get(); + bout.setTypedBytesOutput(out); + return bout; + } + + /** + * Get a thread-local typed bytes writable output for the supplied + * {@link DataOutput}. + * + * @param out data output object + * @return typed bytes writable output corresponding to the supplied + * {@link DataOutput}. + */ + public static TypedBytesWritableOutput get(DataOutput out) { + return get(TypedBytesOutput.get(out)); + } + + /** Creates a new instance of TypedBytesWritableOutput. */ + public TypedBytesWritableOutput(TypedBytesOutput out) { + this.out = out; + } + + /** Creates a new instance of TypedBytesWritableOutput. */ + public TypedBytesWritableOutput(DataOutput dout) { + this(new TypedBytesOutput(dout)); + } + + public void write(Writable w) throws IOException { + if (w instanceof TypedBytesWritable) { + writeTypedBytes((TypedBytesWritable) w); + } else if (w instanceof BytesWritable) { + writeBytes((BytesWritable) w); + } else if (w instanceof ByteWritable) { + writeByte((ByteWritable) w); + } else if (w instanceof BooleanWritable) { + writeBoolean((BooleanWritable) w); + } else if (w instanceof IntWritable) { + writeInt((IntWritable) w); + } else if (w instanceof VIntWritable) { + writeVInt((VIntWritable) w); + } else if (w instanceof LongWritable) { + writeLong((LongWritable) w); + } else if (w instanceof VLongWritable) { + writeVLong((VLongWritable) w); + } else if (w instanceof FloatWritable) { + writeFloat((FloatWritable) w); + } else if (w instanceof DoubleWritable) { + writeDouble((DoubleWritable) w); + } else if (w instanceof Text) { + writeText((Text) w); + } else if (w instanceof ArrayWritable) { + writeArray((ArrayWritable) w); + } else if (w instanceof MapWritable) { + writeMap((MapWritable) w); + } else if (w instanceof SortedMapWritable) { + writeSortedMap((SortedMapWritable) w); + } else if (w instanceof Record) { + writeRecord((Record) w); + } else { + writeWritable(w); // last resort + } + } + + public void writeTypedBytes(TypedBytesWritable tbw) throws IOException { + out.writeRaw(tbw.getBytes(), 0, tbw.getLength()); + } + + public void writeBytes(BytesWritable bw) throws IOException { + byte[] bytes = Arrays.copyOfRange(bw.getBytes(), 0, bw.getLength()); + out.writeBytes(bytes); + } + + public void writeByte(ByteWritable bw) throws IOException { + out.writeByte(bw.get()); + } + + public void writeBoolean(BooleanWritable bw) throws IOException { + out.writeBool(bw.get()); + } + + public void writeInt(IntWritable iw) throws IOException { + out.writeInt(iw.get()); + } + + public void writeVInt(VIntWritable viw) throws IOException { + out.writeInt(viw.get()); + } + + public void writeLong(LongWritable lw) throws IOException { + out.writeLong(lw.get()); + } + + public void writeVLong(VLongWritable vlw) throws IOException { + out.writeLong(vlw.get()); + } + + public void writeFloat(FloatWritable fw) throws IOException { + out.writeFloat(fw.get()); + } + + public void writeDouble(DoubleWritable dw) throws IOException { + out.writeDouble(dw.get()); + } + + public void writeText(Text t) throws IOException { + out.writeString(t.toString()); + } + + public void writeArray(ArrayWritable aw) throws IOException { + Writable[] writables = aw.get(); + out.writeVectorHeader(writables.length); + for (Writable writable : writables) { + write(writable); + } + } + + public void writeMap(MapWritable mw) throws IOException { + out.writeMapHeader(mw.size()); + for (Map.Entry entry : mw.entrySet()) { + write(entry.getKey()); + write(entry.getValue()); + } + } + + public void writeSortedMap(SortedMapWritable smw) throws IOException { + out.writeMapHeader(smw.size()); + for (Map.Entry entry : smw.entrySet()) { + write(entry.getKey()); + write(entry.getValue()); + } + } + + public void writeRecord(Record r) throws IOException { + r.serialize(TypedBytesRecordOutput.get(out)); + } + + public void writeWritable(Writable w) throws IOException { + out.writeVectorHeader(2); + out.writeString(w.getClass().getName()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + w.write(dos); + dos.close(); + out.writeBytes(baos.toByteArray()); + } + +} Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html (added) +++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html Fri Feb 13 04:12:11 2009 @@ -0,0 +1,66 @@ + + + + + + +Typed bytes are sequences of bytes in which the first byte is a type code. They are especially useful as a +(simple and very straightforward) binary format for transferring data to and from Hadoop Streaming programs. + +

Type Codes

+ +Each typed bytes sequence starts with an unsigned byte that contains the type code. Possible values are: +

+ + + + + + + + + + + + + +
CodeType
0A sequence of bytes.
1A byte.
2A boolean.
3An integer.
4A long.
5A float.
6A double.
7A string.
8A vector.
9A list.
10A map.
+

+ +

Subsequent Bytes

+ +These are the subsequent bytes for the different type codes (everything is big-endian and unpadded): +

+ + + + + + + + + + + + + +
CodeSubsequent Bytes
0<32-bit signed integer> <as many bytes as indicated by the integer>
1<signed byte>
2<signed byte (0 = false and 1 = true)>
3<32-bit signed integer>
4<64-bit signed integer>
5<32-bit IEEE floating point number>
6<64-bit IEEE floating point number>
7<32-bit signed integer> <as many UTF-8 bytes as indicated by the integer>
8<32-bit signed integer> <as many typed bytes sequences as indicated by the integer>
9<variable number of typed bytes sequences> <255 written as an unsigned byte>
10<32-bit signed integer> <as many (key-value) pairs of typed bytes sequences as indicated by the integer>
+

+ + + Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesMapApp.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesMapApp.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesMapApp.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesMapApp.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; + +import org.apache.hadoop.io.IntWritable; + +public class RawBytesMapApp { + private String find; + private DataOutputStream dos; + + public RawBytesMapApp(String find) { + this.find = find; + dos = new DataOutputStream(System.out); + } + + public void go() throws IOException { + BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + String line; + while ((line = in.readLine()) != null) { + for (String part : line.split(find)) { + writeString(part); // write key + writeInt(1); // write value + } + } + System.out.flush(); + } + + public static void main(String[] args) throws IOException { + RawBytesMapApp app = new RawBytesMapApp(args[0].replace(".","\\.")); + app.go(); + } + + private void writeString(String str) throws IOException { + byte[] bytes = str.getBytes("UTF-8"); + dos.writeInt(bytes.length); + dos.write(bytes); + } + + private void writeInt(int i) throws IOException { + dos.writeInt(4); + IntWritable iw = new IntWritable(i); + iw.write(dos); + } +} Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesReduceApp.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesReduceApp.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesReduceApp.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesReduceApp.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; + +import org.apache.hadoop.io.IntWritable; + +public class RawBytesReduceApp { + private DataInputStream dis; + + public RawBytesReduceApp() { + dis = new DataInputStream(System.in); + } + + public void go() throws IOException { + String prevKey = null; + int sum = 0; + String key = readString(); + while (key != null) { + if (prevKey != null && !key.equals(prevKey)) { + System.out.println(prevKey + "\t" + sum); + sum = 0; + } + sum += readInt(); + prevKey = key; + key = readString(); + } + System.out.println(prevKey + "\t" + sum); + System.out.flush(); + } + + public static void main(String[] args) throws IOException { + RawBytesReduceApp app = new RawBytesReduceApp(); + app.go(); + } + + private String readString() throws IOException { + int length; + try { + length = dis.readInt(); + } catch (EOFException eof) { + return null; + } + byte[] bytes = new byte[length]; + dis.readFully(bytes); + return new String(bytes, "UTF-8"); + } + + private int readInt() throws IOException { + dis.readInt(); // ignore (we know it's 4) + IntWritable iw = new IntWritable(); + iw.readFields(dis); + return iw.get(); + } +} Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestAutoInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestAutoInputFormat.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestAutoInputFormat.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestAutoInputFormat.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.streaming.AutoInputFormat; + +import junit.framework.TestCase; + +public class TestAutoInputFormat extends TestCase { + + private static Configuration conf = new Configuration(); + + private static final int LINES_COUNT = 3; + + private static final int RECORDS_COUNT = 3; + + private static final int SPLITS_COUNT = 2; + + @SuppressWarnings( { "unchecked", "deprecation" }) + public void testFormat() throws IOException { + JobConf job = new JobConf(conf); + FileSystem fs = FileSystem.getLocal(conf); + Path dir = new Path(System.getProperty("test.build.data", ".") + "/mapred"); + Path txtFile = new Path(dir, "auto.txt"); + Path seqFile = new Path(dir, "auto.seq"); + + fs.delete(dir, true); + + FileInputFormat.setInputPaths(job, dir); + + Writer txtWriter = new OutputStreamWriter(fs.create(txtFile)); + try { + for (int i = 0; i < LINES_COUNT; i++) { + txtWriter.write("" + (10 * i)); + txtWriter.write("\n"); + } + } finally { + txtWriter.close(); + } + + SequenceFile.Writer seqWriter = SequenceFile.createWriter(fs, conf, + seqFile, IntWritable.class, LongWritable.class); + try { + for (int i = 0; i < RECORDS_COUNT; i++) { + IntWritable key = new IntWritable(11 * i); + LongWritable value = new LongWritable(12 * i); + seqWriter.append(key, value); + } + } finally { + seqWriter.close(); + } + + AutoInputFormat format = new AutoInputFormat(); + InputSplit[] splits = format.getSplits(job, SPLITS_COUNT); + for (InputSplit split : splits) { + RecordReader reader = format.getRecordReader(split, job, Reporter.NULL); + Object key = reader.createKey(); + Object value = reader.createValue(); + try { + while (reader.next(key, value)) { + if (key instanceof LongWritable) { + assertEquals("Wrong value class.", Text.class, value.getClass()); + assertTrue("Invalid value", Integer.parseInt(((Text) value) + .toString()) % 10 == 0); + } else { + assertEquals("Wrong key class.", IntWritable.class, key.getClass()); + assertEquals("Wrong value class.", LongWritable.class, value + .getClass()); + assertTrue("Invalid key.", ((IntWritable) key).get() % 11 == 0); + assertTrue("Invalid value.", ((LongWritable) value).get() % 12 == 0); + } + } + } finally { + reader.close(); + } + } + } + +} Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestDumpTypedBytes.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestDumpTypedBytes.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestDumpTypedBytes.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestDumpTypedBytes.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.OutputStreamWriter; +import java.io.PrintStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.streaming.DumpTypedBytes; +import org.apache.hadoop.typedbytes.TypedBytesInput; + +import junit.framework.TestCase; + +public class TestDumpTypedBytes extends TestCase { + + public void testDumping() throws Exception { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null); + FileSystem fs = cluster.getFileSystem(); + PrintStream psBackup = System.out; + ByteArrayOutputStream out = new ByteArrayOutputStream(); + PrintStream psOut = new PrintStream(out); + System.setOut(psOut); + DumpTypedBytes dumptb = new DumpTypedBytes(conf); + + try { + Path root = new Path("/typedbytestest"); + assertTrue(fs.mkdirs(root)); + assertTrue(fs.exists(root)); + OutputStreamWriter writer = new OutputStreamWriter(fs.create(new Path( + root, "test.txt"))); + try { + for (int i = 0; i < 100; i++) { + writer.write("" + (10 * i) + "\n"); + } + } finally { + writer.close(); + } + + String[] args = new String[1]; + args[0] = "/typedbytestest"; + int ret = dumptb.run(args); + assertEquals("Return value != 0.", 0, ret); + + ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); + TypedBytesInput tbinput = new TypedBytesInput(new DataInputStream(in)); + int counter = 0; + Object key = tbinput.read(); + while (key != null) { + assertEquals(Long.class, key.getClass()); // offset + Object value = tbinput.read(); + assertEquals(String.class, value.getClass()); + assertTrue("Invalid output.", + Integer.parseInt(value.toString()) % 10 == 0); + counter++; + key = tbinput.read(); + } + assertEquals("Wrong number of outputs.", 100, counter); + } finally { + try { + fs.close(); + } catch (Exception e) { + } + System.setOut(psBackup); + cluster.shutdown(); + } + } + +} Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestLoadTypedBytes.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestLoadTypedBytes.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestLoadTypedBytes.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestLoadTypedBytes.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.typedbytes.TypedBytesOutput; +import org.apache.hadoop.typedbytes.TypedBytesWritable; + +import junit.framework.TestCase; + +public class TestLoadTypedBytes extends TestCase { + + public void testLoading() throws Exception { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null); + FileSystem fs = cluster.getFileSystem(); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + TypedBytesOutput tboutput = new TypedBytesOutput(new DataOutputStream(out)); + for (int i = 0; i < 100; i++) { + tboutput.write(new Long(i)); // key + tboutput.write("" + (10 * i)); // value + } + InputStream isBackup = System.in; + ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); + System.setIn(in); + LoadTypedBytes loadtb = new LoadTypedBytes(conf); + + try { + Path root = new Path("/typedbytestest"); + assertTrue(fs.mkdirs(root)); + assertTrue(fs.exists(root)); + + String[] args = new String[1]; + args[0] = "/typedbytestest/test.seq"; + int ret = loadtb.run(args); + assertEquals("Return value != 0.", 0, ret); + + Path file = new Path(root, "test.seq"); + assertTrue(fs.exists(file)); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf); + int counter = 0; + TypedBytesWritable key = new TypedBytesWritable(); + TypedBytesWritable value = new TypedBytesWritable(); + while (reader.next(key, value)) { + assertEquals(Long.class, key.getValue().getClass()); + assertEquals(String.class, value.getValue().getClass()); + assertTrue("Invalid record.", + Integer.parseInt(value.toString()) % 10 == 0); + counter++; + } + assertEquals("Wrong number of records.", 100, counter); + } finally { + try { + fs.close(); + } catch (Exception e) { + } + System.setIn(isBackup); + cluster.shutdown(); + } + } + +} Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +import junit.framework.TestCase; + +public class TestRawBytesStreaming extends TestCase { + + protected File INPUT_FILE = new File("input.txt"); + protected File OUTPUT_DIR = new File("out"); + protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n"; + protected String map = StreamUtil.makeJavaCommand(RawBytesMapApp.class, new String[]{"."}); + protected String reduce = StreamUtil.makeJavaCommand(RawBytesReduceApp.class, new String[0]); + protected String outputExpect = "are\t3\nblue\t1\nbunnies\t1\npink\t1\nred\t1\nroses\t1\nviolets\t1\n"; + + public TestRawBytesStreaming() throws IOException { + UtilTest utilTest = new UtilTest(getClass().getName()); + utilTest.checkUserDir(); + utilTest.redirectIfAntJunit(); + } + + protected void createInput() throws IOException { + DataOutputStream out = new DataOutputStream(new FileOutputStream(INPUT_FILE.getAbsoluteFile())); + out.write(input.getBytes("UTF-8")); + out.close(); + } + + protected String[] genArgs() { + return new String[] { + "-input", INPUT_FILE.getAbsolutePath(), + "-output", OUTPUT_DIR.getAbsolutePath(), + "-mapper", map, + "-reducer", reduce, + "-jobconf", "keep.failed.task.files=true", + "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"), + "-jobconf", "stream.map.output=rawbytes", + "-jobconf", "stream.reduce.input=rawbytes", + "-verbose" + }; + } + + public void testCommandLine() throws Exception { + try { + try { + OUTPUT_DIR.getAbsoluteFile().delete(); + } catch (Exception e) { + } + + createInput(); + OUTPUT_DIR.delete(); + + // During tests, the default Configuration will use a local mapred + // So don't specify -config or -cluster + StreamJob job = new StreamJob(); + job.setConf(new Configuration()); + job.run(genArgs()); + File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile(); + String output = StreamUtil.slurp(outFile); + outFile.delete(); + System.out.println(" map=" + map); + System.out.println("reduce=" + reduce); + System.err.println("outEx1=" + outputExpect); + System.err.println(" out1=" + output); + assertEquals(outputExpect, output); + } finally { + File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile(); + INPUT_FILE.delete(); + outFileCRC.delete(); + OUTPUT_DIR.getAbsoluteFile().delete(); + } + } +} Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +import junit.framework.TestCase; + +public class TestTypedBytesStreaming extends TestCase { + + protected File INPUT_FILE = new File("input.txt"); + protected File OUTPUT_DIR = new File("out"); + protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n"; + protected String map = StreamUtil.makeJavaCommand(TypedBytesMapApp.class, new String[]{"."}); + protected String reduce = StreamUtil.makeJavaCommand(TypedBytesReduceApp.class, new String[0]); + protected String outputExpect = "are\t3\nred\t1\nblue\t1\npink\t1\nroses\t1\nbunnies\t1\nviolets\t1\n"; + + public TestTypedBytesStreaming() throws IOException { + UtilTest utilTest = new UtilTest(getClass().getName()); + utilTest.checkUserDir(); + utilTest.redirectIfAntJunit(); + } + + protected void createInput() throws IOException { + DataOutputStream out = new DataOutputStream(new FileOutputStream(INPUT_FILE.getAbsoluteFile())); + out.write(input.getBytes("UTF-8")); + out.close(); + } + + protected String[] genArgs() { + return new String[] { + "-input", INPUT_FILE.getAbsolutePath(), + "-output", OUTPUT_DIR.getAbsolutePath(), + "-mapper", map, + "-reducer", reduce, + "-jobconf", "keep.failed.task.files=true", + "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"), + "-io", "typedbytes" + }; + } + + public void testCommandLine() throws Exception { + try { + try { + OUTPUT_DIR.getAbsoluteFile().delete(); + } catch (Exception e) { + } + + createInput(); + OUTPUT_DIR.delete(); + + // During tests, the default Configuration will use a local mapred + // So don't specify -config or -cluster + StreamJob job = new StreamJob(); + job.setConf(new Configuration()); + job.run(genArgs()); + File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile(); + String output = StreamUtil.slurp(outFile); + outFile.delete(); + System.out.println(" map=" + map); + System.out.println("reduce=" + reduce); + System.err.println("outEx1=" + outputExpect); + System.err.println(" out1=" + output); + assertEquals(outputExpect, output); + } finally { + File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile(); + INPUT_FILE.delete(); + outFileCRC.delete(); + OUTPUT_DIR.getAbsoluteFile().delete(); + } + } +} Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesMapApp.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesMapApp.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesMapApp.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesMapApp.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.typedbytes.TypedBytesInput; +import org.apache.hadoop.typedbytes.TypedBytesOutput; + +public class TypedBytesMapApp { + + private String find; + + public TypedBytesMapApp(String find) { + this.find = find; + } + + public void go() throws IOException { + TypedBytesInput tbinput = new TypedBytesInput(new DataInputStream(System.in)); + TypedBytesOutput tboutput = new TypedBytesOutput(new DataOutputStream(System.out)); + + Object key = tbinput.readRaw(); + while (key != null) { + Object value = tbinput.read(); + for (String part : value.toString().split(find)) { + tboutput.write(part); // write key + tboutput.write(1); // write value + } + System.err.println("reporter:counter:UserCounters,InputLines,1"); + key = tbinput.readRaw(); + } + + System.out.flush(); + } + + public static void main(String[] args) throws IOException { + TypedBytesMapApp app = new TypedBytesMapApp(args[0].replace(".","\\.")); + app.go(); + } + +} Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesReduceApp.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesReduceApp.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesReduceApp.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesReduceApp.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.typedbytes.TypedBytesInput; +import org.apache.hadoop.typedbytes.TypedBytesOutput; + +public class TypedBytesReduceApp { + + public void go() throws IOException { + TypedBytesInput tbinput = new TypedBytesInput(new DataInputStream(System.in)); + TypedBytesOutput tboutput = new TypedBytesOutput(new DataOutputStream(System.out)); + + Object prevKey = null; + int sum = 0; + Object key = tbinput.read(); + while (key != null) { + if (prevKey != null && !key.equals(prevKey)) { + tboutput.write(prevKey); // write key + tboutput.write(sum); // write value + sum = 0; + } + sum += (Integer) tbinput.read(); + prevKey = key; + key = tbinput.read(); + } + tboutput.write(prevKey); + tboutput.write(sum); + + System.out.flush(); + } + + public static void main(String[] args) throws IOException { + TypedBytesReduceApp app = new TypedBytesReduceApp(); + app.go(); + } + +} Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.typedbytes; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.VIntWritable; +import org.apache.hadoop.io.VLongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.record.Buffer; +import org.apache.hadoop.record.RecRecord0; +import org.apache.hadoop.record.RecRecord1; + +import junit.framework.TestCase; + +public class TestIO extends TestCase { + + private File tmpfile; + + protected void setUp() throws Exception { + this.tmpfile = new File(System.getProperty("test.build.data", "/tmp"), + "typedbytes.bin"); + } + + protected void tearDown() throws Exception { + tmpfile.delete(); + } + + public void testIO() throws IOException { + ArrayList vector = new ArrayList(); + vector.add("test"); + vector.add(false); + vector.add(12345); + List list = new LinkedList(); + list.add("another test"); + list.add(true); + list.add(123456789L); + Map map = new HashMap(); + map.put("one", 1); + map.put("vector", vector); + Object[] objects = new Object[] { + new Buffer(new byte[] { 1, 2, 3, 4 }), + (byte) 123, true, 12345, 123456789L, (float) 1.2, 1.234, + "random string", vector, list, map + }; + + FileOutputStream ostream = new FileOutputStream(tmpfile); + DataOutputStream dostream = new DataOutputStream(ostream); + TypedBytesOutput out = new TypedBytesOutput(dostream); + for (Object obj : objects) { + out.write(obj); + } + dostream.close(); + ostream.close(); + + FileInputStream istream = new FileInputStream(tmpfile); + DataInputStream distream = new DataInputStream(istream); + TypedBytesInput in = new TypedBytesInput(distream); + for (Object obj : objects) { + assertEquals(obj, in.read()); + } + distream.close(); + istream.close(); + + istream = new FileInputStream(tmpfile); + distream = new DataInputStream(istream); + in = new TypedBytesInput(distream); + for (Object obj : objects) { + byte[] bytes = in.readRaw(); + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + DataInputStream dis = new DataInputStream(bais); + assertEquals(obj, (new TypedBytesInput(dis)).read()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + TypedBytesOutput tbout = new TypedBytesOutput(new DataOutputStream(baos)); + tbout.writeRaw(bytes); + bais = new ByteArrayInputStream(bytes); + dis = new DataInputStream(bais); + assertEquals(obj, (new TypedBytesInput(dis)).read()); + } + distream.close(); + istream.close(); + } + + public void testRecordIO() throws IOException { + RecRecord1 r1 = new RecRecord1(); + r1.setBoolVal(true); + r1.setByteVal((byte) 0x66); + r1.setFloatVal(3.145F); + r1.setDoubleVal(1.5234); + r1.setIntVal(-4567); + r1.setLongVal(-2367L); + r1.setStringVal("random text"); + r1.setBufferVal(new Buffer()); + r1.setVectorVal(new ArrayList()); + r1.setMapVal(new TreeMap()); + RecRecord0 r0 = new RecRecord0(); + r0.setStringVal("other random text"); + r1.setRecordVal(r0); + + FileOutputStream ostream = new FileOutputStream(tmpfile); + DataOutputStream dostream = new DataOutputStream(ostream); + TypedBytesRecordOutput out = TypedBytesRecordOutput.get(dostream); + r1.serialize(out, ""); + dostream.close(); + ostream.close(); + + FileInputStream istream = new FileInputStream(tmpfile); + DataInputStream distream = new DataInputStream(istream); + TypedBytesRecordInput in = TypedBytesRecordInput.get(distream); + RecRecord1 r2 = new RecRecord1(); + r2.deserialize(in, ""); + distream.close(); + istream.close(); + assertEquals(r1, r2); + } + + public void testWritableIO() throws IOException { + Writable[] vectorValues = new Writable[] { + new Text("test1"), new Text("test2"), new Text("test3") + }; + ArrayWritable vector = new ArrayWritable(Text.class, vectorValues); + MapWritable map = new MapWritable(); + map.put(new Text("one"), new VIntWritable(1)); + map.put(new Text("two"), new VLongWritable(2)); + Writable[] writables = new Writable[] { + new BytesWritable(new byte[] { 1, 2, 3, 4 }), + new ByteWritable((byte) 123), new BooleanWritable(true), + new VIntWritable(12345), new VLongWritable(123456789L), + new FloatWritable((float) 1.2), new DoubleWritable(1.234), + new Text("random string") + }; + TypedBytesWritable tbw = new TypedBytesWritable(); + tbw.setValue("typed bytes text"); + RecRecord1 r1 = new RecRecord1(); + r1.setBoolVal(true); + r1.setByteVal((byte) 0x66); + r1.setFloatVal(3.145F); + r1.setDoubleVal(1.5234); + r1.setIntVal(-4567); + r1.setLongVal(-2367L); + r1.setStringVal("random text"); + r1.setBufferVal(new Buffer()); + r1.setVectorVal(new ArrayList()); + r1.setMapVal(new TreeMap()); + RecRecord0 r0 = new RecRecord0(); + r0.setStringVal("other random text"); + r1.setRecordVal(r0); + + FileOutputStream ostream = new FileOutputStream(tmpfile); + DataOutputStream dostream = new DataOutputStream(ostream); + TypedBytesWritableOutput out = new TypedBytesWritableOutput(dostream); + for (Writable w : writables) { + out.write(w); + } + out.write(tbw); + out.write(vector); + out.write(map); + out.write(r1); + dostream.close(); + ostream.close(); + + FileInputStream istream = new FileInputStream(tmpfile); + DataInputStream distream = new DataInputStream(istream); + + TypedBytesWritableInput in = new TypedBytesWritableInput(distream); + for (Writable w : writables) { + assertEquals(w, in.read()); + } + + assertEquals(tbw.getValue().toString(), in.read().toString()); + + assertEquals(ArrayWritable.class, in.readType()); + ArrayWritable aw = in.readArray(); + Writable[] writables1 = vector.get(), writables2 = aw.get(); + assertEquals(writables1.length, writables2.length); + for (int i = 0; i < writables1.length; i++) { + assertEquals(((Text) writables1[i]).toString(), + ((TypedBytesWritable) writables2[i]).getValue()); + } + assertEquals(MapWritable.class, in.readType()); + + MapWritable mw = in.readMap(); + assertEquals(map.entrySet(), mw.entrySet()); + + assertEquals(Type.LIST, TypedBytesInput.get(distream).readType()); + assertEquals(r1.getBoolVal(), TypedBytesInput.get(distream).read()); + assertEquals(r1.getByteVal(), TypedBytesInput.get(distream).read()); + assertEquals(r1.getIntVal(), TypedBytesInput.get(distream).read()); + assertEquals(r1.getLongVal(), TypedBytesInput.get(distream).read()); + assertEquals(r1.getFloatVal(), TypedBytesInput.get(distream).read()); + assertEquals(r1.getDoubleVal(), TypedBytesInput.get(distream).read()); + assertEquals(r1.getStringVal(), TypedBytesInput.get(distream).read()); + Object prevObj = null, obj = TypedBytesInput.get(distream).read(); + while (obj != null) { + prevObj = obj; + obj = TypedBytesInput.get(distream).read(); + } + List recList = (List) prevObj; + assertEquals(r0.getStringVal(), recList.get(0)); + + distream.close(); + istream.close(); + } + +} Added: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestTypedBytesWritable.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestTypedBytesWritable.java?rev=743975&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestTypedBytesWritable.java (added) +++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestTypedBytesWritable.java Fri Feb 13 04:12:11 2009 @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.typedbytes; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; + +import junit.framework.TestCase; + +public class TestTypedBytesWritable extends TestCase { + + public void testToString() { + TypedBytesWritable tbw = new TypedBytesWritable(); + tbw.setValue(true); + assertEquals("true", tbw.toString()); + tbw.setValue(12345); + assertEquals("12345", tbw.toString()); + tbw.setValue(123456789L); + assertEquals("123456789", tbw.toString()); + tbw.setValue((float) 1.23); + assertEquals("1.23", tbw.toString()); + tbw.setValue(1.23456789); + assertEquals("1.23456789", tbw.toString()); + tbw.setValue("random text"); + assertEquals("random text", tbw.toString()); + } + + public void testIO() throws IOException { + TypedBytesWritable tbw = new TypedBytesWritable(); + tbw.setValue(12345); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutput dout = new DataOutputStream(baos); + tbw.write(dout); + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInput din = new DataInputStream(bais); + TypedBytesWritable readTbw = new TypedBytesWritable(); + readTbw.readFields(din); + assertEquals(tbw, readTbw); + } + +}