Return-Path: X-Original-To: apmail-avro-commits-archive@www.apache.org Delivered-To: apmail-avro-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 42FB6C32F for ; Thu, 17 May 2012 19:51:13 +0000 (UTC) Received: (qmail 43188 invoked by uid 500); 17 May 2012 19:51:13 -0000 Delivered-To: apmail-avro-commits-archive@avro.apache.org Received: (qmail 43167 invoked by uid 500); 17 May 2012 19:51:13 -0000 Mailing-List: contact commits-help@avro.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@avro.apache.org Delivered-To: mailing list commits@avro.apache.org Received: (qmail 43160 invoked by uid 99); 17 May 2012 19:51:13 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 May 2012 19:51:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 May 2012 19:51:10 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C43E723889E0 for ; Thu, 17 May 2012 19:50:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1339825 - in /avro/trunk: ./ lang/java/avro/src/main/java/org/apache/avro/file/ lang/java/avro/src/main/java/org/apache/avro/generic/ lang/java/avro/src/main/java/org/apache/avro/io/ lang/java/avro/src/main/java/org/apache/avro/reflect/ la... Date: Thu, 17 May 2012 19:50:50 -0000 To: commits@avro.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120517195050.C43E723889E0@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Thu May 17 19:50:49 2012 New Revision: 1339825 URL: http://svn.apache.org/viewvc?rev=1339825&view=rev Log: AVRO-1081. Java: Fix to be able to write ByteBuffers that have no backing array. Also fix reflection to correctly read ByteBuffer fields. Added: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java (with props) Modified: avro/trunk/CHANGES.txt avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java Modified: avro/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1339825&r1=1339824&r2=1339825&view=diff ============================================================================== --- avro/trunk/CHANGES.txt (original) +++ avro/trunk/CHANGES.txt Thu May 17 19:50:49 2012 @@ -58,6 +58,10 @@ Avro 1.7.0 (unreleased) AVRO-1065. NodeRecord::isValid() treats records with no fields as invalid. (thiru) + AVRO-1081. Java: Fix to be able to write ByteBuffers that have no + backing array. Also fix reflection to correctly read ByteBuffer + fields. (cutting) + Avro 1.6.3 (5 March 2012) AVRO-1077. Missing 'inline' for union set function. (thiru) Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java?rev=1339825&r1=1339824&r2=1339825&view=diff ============================================================================== --- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java (original) +++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java Thu May 17 19:50:49 2012 @@ -279,8 +279,7 @@ public class DataFileWriter implement * Appending non-conforming data may result in an unreadable file. */ public void appendEncoded(ByteBuffer datum) throws IOException { assertOpen(); - int start = datum.position(); - bufOut.writeFixed(datum.array(), start, datum.limit()-start); + bufOut.writeFixed(datum); blockCount++; writeIfBlockFull(); } Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java?rev=1339825&r1=1339824&r2=1339825&view=diff ============================================================================== --- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java (original) +++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java Thu May 17 19:50:49 2012 @@ -148,7 +148,7 @@ public class GenericDatumReader imple case UNION: return read(old, expected.getTypes().get(in.readIndex()), in); case FIXED: return readFixed(old, expected, in); case STRING: return readString(old, expected, in); - case BYTES: return readBytes(old, in); + case BYTES: return readBytes(old, expected, in); case INT: return readInt(old, expected, in); case LONG: return in.readLong(); case FLOAT: return in.readFloat(); @@ -344,6 +344,14 @@ public class GenericDatumReader imple /** Called to read byte arrays. Subclasses may override to use a different * byte array representation. By default, this calls {@link * Decoder#readBytes(ByteBuffer)}.*/ + protected Object readBytes(Object old, Schema s, Decoder in) + throws IOException { + return readBytes(old, in); + } + + /** Called to read byte arrays. Subclasses may override to use a different + * byte array representation. By default, this calls {@link + * Decoder#readBytes(ByteBuffer)}.*/ protected Object readBytes(Object old, Decoder in) throws IOException { return in.readBytes(old instanceof ByteBuffer ? (ByteBuffer) old : null); } Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java?rev=1339825&r1=1339824&r2=1339825&view=diff ============================================================================== --- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java (original) +++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java Thu May 17 19:50:49 2012 @@ -57,10 +57,13 @@ public abstract class BinaryEncoder exte @Override public void writeBytes(ByteBuffer bytes) throws IOException { - int pos = bytes.position(); - int start = bytes.arrayOffset() + pos; - int len = bytes.limit() - pos; - writeBytes(bytes.array(), start, len); + int len = bytes.limit() - bytes.position(); + if (0 == len) { + writeZero(); + } else { + writeInt(len); + writeFixed(bytes); + } } @Override Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java?rev=1339825&r1=1339824&r2=1339825&view=diff ============================================================================== --- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java (original) +++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java Thu May 17 19:50:49 2012 @@ -19,6 +19,9 @@ package org.apache.avro.io; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import org.apache.avro.AvroRuntimeException; @@ -151,6 +154,15 @@ public class BufferedBinaryEncoder exten System.arraycopy(bytes, start, buf, pos, len); pos += len; } + + @Override + public void writeFixed(ByteBuffer bytes) throws IOException { + if (!bytes.hasArray() && bytes.remaining() > bulkLimit) { + sink.innerWrite(bytes); // bypass the buffer + } else { + super.writeFixed(bytes); + } + } @Override protected void writeZero() throws IOException { @@ -182,15 +194,20 @@ public class BufferedBinaryEncoder exten protected ByteSink() {} /** Write data from bytes, starting at off, for len bytes **/ protected abstract void innerWrite(byte[] bytes, int off, int len) throws IOException; + + protected abstract void innerWrite(ByteBuffer buff) throws IOException; + /** Flush the underlying output, if supported **/ protected abstract void innerFlush() throws IOException; } static class OutputStreamSink extends ByteSink { private final OutputStream out; + private final WritableByteChannel channel; private OutputStreamSink(OutputStream out) { super(); this.out = out; + channel = Channels.newChannel(out); } @Override protected void innerWrite(byte[] bytes, int off, int len) @@ -201,5 +218,9 @@ public class BufferedBinaryEncoder exten protected void innerFlush() throws IOException { out.flush(); } + @Override + protected void innerWrite(ByteBuffer buff) throws IOException { + channel.write(buff); + } } } Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java?rev=1339825&r1=1339824&r2=1339825&view=diff ============================================================================== --- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java (original) +++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java Thu May 17 19:50:49 2012 @@ -166,6 +166,19 @@ public abstract class Encoder implements writeFixed(bytes, 0, bytes.length); } + /** A version of {@link writeFixed} for ByteBuffers. */ + public void writeFixed(ByteBuffer bytes) throws IOException { + int pos = bytes.position(); + int len = bytes.limit() - pos; + if (bytes.hasArray()) { + writeFixed(bytes.array(), bytes.arrayOffset() + pos, len); + } else { + byte[] b = new byte[len]; + bytes.get(b, 0, len); + writeFixed(b, 0, len); + } + } + /** * Writes an enumeration. * @param e Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java?rev=1339825&r1=1339824&r2=1339825&view=diff ============================================================================== --- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java (original) +++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java Thu May 17 19:50:49 2012 @@ -262,8 +262,11 @@ public class ReflectData extends Specifi return super.createSchema(type, names); if (c.isArray()) { // array Class component = c.getComponentType(); - if (component == Byte.TYPE) // byte array - return Schema.create(Schema.Type.BYTES); + if (component == Byte.TYPE) { // byte array + Schema result = Schema.create(Schema.Type.BYTES); + result.addProp(CLASS_PROP, c.getName()); + return result; + } Schema result = Schema.createArray(createSchema(component, names)); setElement(result, component); return result; Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java?rev=1339825&r1=1339824&r2=1339825&view=diff ============================================================================== --- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java (original) +++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java Thu May 17 19:50:49 2012 @@ -121,11 +121,17 @@ public class ReflectDatumReader exten protected Object createString(String value) { return value; } @Override - protected Object readBytes(Object old, Decoder in) throws IOException { + protected Object readBytes(Object old, Schema s, Decoder in) + throws IOException { ByteBuffer bytes = in.readBytes(null); - byte[] result = new byte[bytes.remaining()]; - bytes.get(result); - return result; + Class c = ReflectData.getClassProp(s, ReflectData.CLASS_PROP); + if (c != null && c.isArray()) { + byte[] result = new byte[bytes.remaining()]; + bytes.get(result); + return result; + } else { + return bytes; + } } @Override Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java?rev=1339825&r1=1339824&r2=1339825&view=diff ============================================================================== --- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java (original) +++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java Thu May 17 19:50:49 2012 @@ -101,7 +101,8 @@ public class TestReflect { } @Test public void testBytes() { - check(new byte[0], "\"bytes\""); + check(ByteBuffer.allocate(0), "\"bytes\""); + check(new byte[0], "{\"type\":\"bytes\",\"java-class\":\"[B\"}"); } @Test public void testUnionWithCollection() { Added: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java?rev=1339825&view=auto ============================================================================== --- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java (added) +++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java Thu May 17 19:50:49 2012 @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.reflect; + +import static org.junit.Assert.*; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Iterator; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.FileReader; +import org.apache.avro.file.SeekableByteArrayInput; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.reflect.ReflectDatumWriter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class ByteBufferTest { + static class X{ + String name = ""; + ByteBuffer content; + } + File tmpdir; + File content; + + @Before public void before() throws IOException{ + tmpdir = File.createTempFile("avro", "test"); + tmpdir.delete(); + tmpdir.mkdirs(); + content = new File(tmpdir,"test-content"); + FileOutputStream out = new FileOutputStream(content); + for(int i=0;i<100000;i++){ + out.write("hello world\n".getBytes()); + } + out.close(); + } + + @Test public void test() throws Exception{ + Schema schema = ReflectData.get().getSchema(X.class); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + writeOneXAsAvro(schema, bout); + X record = readOneXFromAvro(schema, bout); + + String expected = getmd5(content); + String actual = getmd5(record.content); + assertEquals("md5 for result differed from input",expected,actual); + } + + private X readOneXFromAvro(Schema schema, ByteArrayOutputStream bout) + throws IOException { + SeekableByteArrayInput input = new SeekableByteArrayInput(bout.toByteArray()); + ReflectDatumReader datumReader = new ReflectDatumReader(schema); + FileReader reader = DataFileReader.openReader(input, datumReader); + Iterator it = reader.iterator(); + assertTrue("missing first record",it.hasNext()); + X record = it.next(); + assertFalse("should be no more records - only wrote one out",it.hasNext()); + return record; + } + + private void writeOneXAsAvro(Schema schema, ByteArrayOutputStream bout) + throws IOException, FileNotFoundException { + DatumWriter datumWriter = new ReflectDatumWriter(schema); + DataFileWriter writer = new DataFileWriter(datumWriter); + writer.create(schema, bout); + X x = new X(); + x.name = "xxx"; + FileInputStream fis = new FileInputStream(content); + try{ + FileChannel channel = fis.getChannel(); + try{ + long contentLength = content.length(); + //set the content to be a file channel. + ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, contentLength); + x.content = buffer; + writer.append(x); + }finally{ + channel.close(); + } + }finally{ + fis.close(); + } + writer.flush(); + writer.close(); + } + + private String getmd5(File file) throws Exception{ + FileInputStream fis = new FileInputStream(content); + try{ + FileChannel channel = fis.getChannel(); + try{ + long contentLength = content.length(); + ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, contentLength); + return getmd5(buffer); + }finally{ + channel.close(); + } + }finally{ + fis.close(); + } + } + + String getmd5(ByteBuffer buffer) throws NoSuchAlgorithmException{ + MessageDigest mdEnc = MessageDigest.getInstance("MD5"); + mdEnc.reset(); + mdEnc.update(buffer); + return new BigInteger(1, mdEnc.digest()).toString(16); + } +} Propchange: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java ------------------------------------------------------------------------------ svn:eol-style = native