avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1339825 - in /avro/trunk: ./ lang/java/avro/src/main/java/org/apache/avro/file/ lang/java/avro/src/main/java/org/apache/avro/generic/ lang/java/avro/src/main/java/org/apache/avro/io/ lang/java/avro/src/main/java/org/apache/avro/reflect/ la...
Date Thu, 17 May 2012 19:50:50 GMT
Author: cutting
Date: Thu May 17 19:50:49 2012
New Revision: 1339825

URL: http://svn.apache.org/viewvc?rev=1339825&view=rev
Log:
AVRO-1081. Java: Fix to be able to write ByteBuffers that have no backing array.  Also fix
reflection to correctly read ByteBuffer fields.

Added:
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java  
(with props)
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu May 17 19:50:49 2012
@@ -58,6 +58,10 @@ Avro 1.7.0 (unreleased)
     AVRO-1065. NodeRecord::isValid() treats records with no fields as
     invalid. (thiru)
 
+    AVRO-1081. Java: Fix to be able to write ByteBuffers that have no
+    backing array.  Also fix reflection to correctly read ByteBuffer
+    fields.  (cutting)
+
 Avro 1.6.3 (5 March 2012)
 
     AVRO-1077. Missing 'inline' for union set function. (thiru)

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java Thu May
17 19:50:49 2012
@@ -279,8 +279,7 @@ public class DataFileWriter<D> implement
    * Appending non-conforming data may result in an unreadable file. */
   public void appendEncoded(ByteBuffer datum) throws IOException {
     assertOpen();
-    int start = datum.position();
-    bufOut.writeFixed(datum.array(), start, datum.limit()-start);
+    bufOut.writeFixed(datum);
     blockCount++;
     writeIfBlockFull();
   }

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
(original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
Thu May 17 19:50:49 2012
@@ -148,7 +148,7 @@ public class GenericDatumReader<D> imple
     case UNION:   return read(old, expected.getTypes().get(in.readIndex()), in);
     case FIXED:   return readFixed(old, expected, in);
     case STRING:  return readString(old, expected, in);
-    case BYTES:   return readBytes(old, in);
+    case BYTES:   return readBytes(old, expected, in);
     case INT:     return readInt(old, expected, in);
     case LONG:    return in.readLong();
     case FLOAT:   return in.readFloat();
@@ -344,6 +344,14 @@ public class GenericDatumReader<D> imple
   /** Called to read byte arrays.  Subclasses may override to use a different
    * byte array representation.  By default, this calls {@link
    * Decoder#readBytes(ByteBuffer)}.*/
+  protected Object readBytes(Object old, Schema s, Decoder in)
+    throws IOException {
+    return readBytes(old, in);
+  }
+
+  /** Called to read byte arrays.  Subclasses may override to use a different
+   * byte array representation.  By default, this calls {@link
+   * Decoder#readBytes(ByteBuffer)}.*/
   protected Object readBytes(Object old, Decoder in) throws IOException {
     return in.readBytes(old instanceof ByteBuffer ? (ByteBuffer) old : null);
   }

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java Thu May
17 19:50:49 2012
@@ -57,10 +57,13 @@ public abstract class BinaryEncoder exte
 
   @Override
   public void writeBytes(ByteBuffer bytes) throws IOException {
-    int pos = bytes.position();
-    int start = bytes.arrayOffset() + pos;
-    int len = bytes.limit() - pos;
-    writeBytes(bytes.array(), start, len);
+    int len = bytes.limit() - bytes.position();
+    if (0 == len) {
+      writeZero();
+    } else {
+      writeInt(len);
+      writeFixed(bytes);
+    }
   }
   
   @Override

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java
(original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java
Thu May 17 19:50:49 2012
@@ -19,6 +19,9 @@ package org.apache.avro.io;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 
 import org.apache.avro.AvroRuntimeException;
 
@@ -151,6 +154,15 @@ public class BufferedBinaryEncoder exten
     System.arraycopy(bytes, start, buf, pos, len);
     pos += len;
   }
+  
+  @Override
+  public void writeFixed(ByteBuffer bytes) throws IOException {
+    if (!bytes.hasArray() && bytes.remaining() > bulkLimit) {
+      sink.innerWrite(bytes);                     // bypass the buffer
+    } else {
+      super.writeFixed(bytes);
+    }
+  }
 
   @Override
   protected void writeZero() throws IOException {
@@ -182,15 +194,20 @@ public class BufferedBinaryEncoder exten
     protected ByteSink() {}
     /** Write data from bytes, starting at off, for len bytes **/
     protected abstract void innerWrite(byte[] bytes, int off, int len) throws IOException;
+    
+    protected abstract void innerWrite(ByteBuffer buff) throws IOException;
+    
     /** Flush the underlying output, if supported **/
     protected abstract void innerFlush() throws IOException;
   }
   
   static class OutputStreamSink extends ByteSink {
     private final OutputStream out;
+    private final WritableByteChannel channel;
     private OutputStreamSink(OutputStream out) {
       super();
       this.out = out;
+      channel = Channels.newChannel(out);
     }
     @Override
     protected void innerWrite(byte[] bytes, int off, int len)
@@ -201,5 +218,9 @@ public class BufferedBinaryEncoder exten
     protected void innerFlush() throws IOException {
       out.flush();
     }
+    @Override
+    protected void innerWrite(ByteBuffer buff) throws IOException {
+      channel.write(buff);
+    }
   }
 }

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java Thu May 17 19:50:49
2012
@@ -166,6 +166,19 @@ public abstract class Encoder implements
     writeFixed(bytes, 0, bytes.length);
   }
   
+  /** A version of {@link writeFixed} for ByteBuffers. */
+  public void writeFixed(ByteBuffer bytes) throws IOException {
+    int pos = bytes.position();
+    int len = bytes.limit() - pos;
+    if (bytes.hasArray()) {
+      writeFixed(bytes.array(), bytes.arrayOffset() + pos, len);
+    } else {
+      byte[] b = new byte[len];
+      bytes.get(b, 0, len);
+      writeFixed(b, 0, len);
+    }
+  }
+
   /**
    * Writes an enumeration.
    * @param e

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java Thu May
17 19:50:49 2012
@@ -262,8 +262,11 @@ public class ReflectData extends Specifi
         return super.createSchema(type, names);
       if (c.isArray()) {                                     // array
         Class component = c.getComponentType();
-        if (component == Byte.TYPE)                          // byte array
-          return Schema.create(Schema.Type.BYTES);
+        if (component == Byte.TYPE) {                        // byte array
+          Schema result = Schema.create(Schema.Type.BYTES);
+          result.addProp(CLASS_PROP, c.getName());
+          return result;
+        }
         Schema result = Schema.createArray(createSchema(component, names));
         setElement(result, component);
         return result;

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java
(original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java
Thu May 17 19:50:49 2012
@@ -121,11 +121,17 @@ public class ReflectDatumReader<T> exten
   protected Object createString(String value) { return value; }
 
   @Override
-  protected Object readBytes(Object old, Decoder in) throws IOException {
+  protected Object readBytes(Object old, Schema s, Decoder in)
+    throws IOException {
     ByteBuffer bytes = in.readBytes(null);
-    byte[] result = new byte[bytes.remaining()];
-    bytes.get(result);
-    return result;
+    Class c = ReflectData.getClassProp(s, ReflectData.CLASS_PROP);
+    if (c != null && c.isArray()) {
+      byte[] result = new byte[bytes.remaining()];
+      bytes.get(result);
+      return result;
+    } else {
+      return bytes;
+    }
   }
 
   @Override

Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java (original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java Thu May 17 19:50:49
2012
@@ -101,7 +101,8 @@ public class TestReflect {
   }
 
   @Test public void testBytes() {
-    check(new byte[0], "\"bytes\"");
+    check(ByteBuffer.allocate(0), "\"bytes\"");
+    check(new byte[0], "{\"type\":\"bytes\",\"java-class\":\"[B\"}");
   }
 
   @Test public void testUnionWithCollection() {

Added: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java?rev=1339825&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java (added)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java Thu
May 17 19:50:49 2012
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.reflect;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableByteArrayInput;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ByteBufferTest {
+  static class X{
+    String name = "";
+    ByteBuffer content;
+  }
+  File tmpdir;
+  File content;
+
+  @Before public void before() throws IOException{
+    tmpdir = File.createTempFile("avro", "test");
+    tmpdir.delete();
+    tmpdir.mkdirs();
+    content = new File(tmpdir,"test-content");
+    FileOutputStream out = new FileOutputStream(content);
+    for(int i=0;i<100000;i++){
+      out.write("hello world\n".getBytes());
+    }
+    out.close();
+  }
+
+  @Test public void test() throws Exception{
+    Schema schema = ReflectData.get().getSchema(X.class);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    writeOneXAsAvro(schema, bout);		
+    X record = readOneXFromAvro(schema, bout);
+		
+    String expected = getmd5(content);
+    String actual = getmd5(record.content);
+    assertEquals("md5 for result differed from input",expected,actual);
+  }
+
+  private X readOneXFromAvro(Schema schema, ByteArrayOutputStream bout)
+    throws IOException {
+    SeekableByteArrayInput input = new SeekableByteArrayInput(bout.toByteArray());
+    ReflectDatumReader<X> datumReader = new ReflectDatumReader<X>(schema);
+    FileReader<X> reader = DataFileReader.openReader(input, datumReader);
+    Iterator<X> it = reader.iterator();
+    assertTrue("missing first record",it.hasNext());
+    X record = it.next();
+    assertFalse("should be no more records - only wrote one out",it.hasNext());
+    return record;
+  }
+
+  private void writeOneXAsAvro(Schema schema, ByteArrayOutputStream bout)
+    throws IOException, FileNotFoundException {
+    DatumWriter<X> datumWriter = new ReflectDatumWriter<X>(schema);
+    DataFileWriter<X> writer = new DataFileWriter<X>(datumWriter);
+    writer.create(schema, bout);
+    X x = new X();
+    x.name = "xxx";
+    FileInputStream fis = new FileInputStream(content);
+    try{
+      FileChannel channel = fis.getChannel();
+      try{
+        long contentLength = content.length();
+        //set the content to be a file channel.
+        ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, contentLength);
+        x.content = buffer;
+        writer.append(x);
+      }finally{
+        channel.close();
+      }
+    }finally{
+      fis.close();
+    }
+    writer.flush();
+    writer.close();
+  }
+
+  private String getmd5(File file) throws Exception{
+    FileInputStream fis = new FileInputStream(content);
+    try{
+      FileChannel channel = fis.getChannel();
+      try{
+        long contentLength = content.length();
+        ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, contentLength);
+        return getmd5(buffer);
+      }finally{
+        channel.close();
+      }
+    }finally{
+      fis.close();
+    }
+  }
+
+  String getmd5(ByteBuffer buffer) throws NoSuchAlgorithmException{
+    MessageDigest mdEnc = MessageDigest.getInstance("MD5");
+    mdEnc.reset();
+    mdEnc.update(buffer);
+    return new BigInteger(1, mdEnc.digest()).toString(16);
+  }
+}

Propchange: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message