avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From scottca...@apache.org
Subject svn commit: r1074364 [2/2] - in /avro/trunk: ./ lang/java/avro/src/main/java/org/apache/avro/ lang/java/avro/src/main/java/org/apache/avro/file/ lang/java/avro/src/main/java/org/apache/avro/io/ lang/java/avro/src/main/java/org/apache/avro/io/parsing/ l...
Date Fri, 25 Feb 2011 00:36:41 GMT
Added: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/LegacyBinaryEncoder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/LegacyBinaryEncoder.java?rev=1074364&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/LegacyBinaryEncoder.java (added)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/LegacyBinaryEncoder.java Fri Feb 25 00:36:40 2011
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.util.Utf8;
+
+/**
+ * Low-level support for serializing Avro values.
+ *
+ * This class has two types of methods.  One type of methods support
+ * the writing of leaf values (for example, {@link #writeLong} and
+ * {@link #writeString}).  These methods have analogs in {@link
+ * Decoder}.
+ *
+ * The other type of methods support the writing of maps and arrays.
+ * These methods are {@link #writeArrayStart}, {@link
+ * #startItem}, and {@link #writeArrayEnd} (and similar methods for
+ * maps).  Some implementations of {@link Encoder} handle the
+ * buffering required to break large maps and arrays into blocks,
+ * which is necessary for applications that want to do streaming.
+ * (See {@link #writeArrayStart} for details on these methods.)
+ *
+ *  @see Decoder
+ */
+public class LegacyBinaryEncoder extends Encoder {
+  protected OutputStream out;
+  
+  private interface ByteWriter {
+    void write(ByteBuffer bytes) throws IOException;
+  }
+  
+  private static final class SimpleByteWriter implements ByteWriter {
+    private final OutputStream out;
+
+    public SimpleByteWriter(OutputStream out) {
+      this.out = out;
+    }
+
+    @Override
+    public void write(ByteBuffer bytes) throws IOException {
+      encodeLong(bytes.remaining(), out);
+      out.write(bytes.array(), bytes.position(), bytes.remaining());
+    }
+  }
+  
+  private final ByteWriter byteWriter;
+
+  /** Create a writer that sends its output to the underlying stream
+   *  <code>out</code>. */
+  public LegacyBinaryEncoder(OutputStream out) {
+    this.out = out;
+    this.byteWriter = new SimpleByteWriter(out);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (out != null) {
+      out.flush();
+    }
+  }
+
+  @Override
+  public void writeNull() throws IOException { }
+  
+  @Override
+  public void writeBoolean(boolean b) throws IOException {
+    out.write(b ? 1 : 0);
+  }
+
+  @Override
+  public void writeInt(int n) throws IOException {
+    encodeLong(n, out);
+  }
+
+  @Override
+  public void writeLong(long n) throws IOException {
+    encodeLong(n, out);
+  }
+  
+  @Override
+  public void writeFloat(float f) throws IOException {
+    encodeFloat(f, out);
+  }
+
+  @Override
+  public void writeDouble(double d) throws IOException {
+    encodeDouble(d, out);
+  }
+
+  @Override
+  public void writeString(Utf8 utf8) throws IOException {
+    encodeString(utf8.getBytes(), 0, utf8.getByteLength());
+  }
+  
+  @Override
+  public void writeString(String string) throws IOException {
+    byte[] bytes = Utf8.getBytesFor(string);
+    encodeString(bytes, 0, bytes.length);
+  }
+  
+  private void encodeString(byte[] bytes, int offset, int length) throws IOException {
+    encodeLong(length, out);
+    out.write(bytes, offset, length);
+  }
+  
+  @Override
+  public void writeBytes(ByteBuffer bytes) throws IOException {
+    byteWriter.write(bytes);
+  }
+  
+  @Override
+  public void writeBytes(byte[] bytes, int start, int len) throws IOException {
+    encodeLong(len, out);
+    out.write(bytes, start, len);
+  }
+  
+  @Override
+  public void writeFixed(byte[] bytes, int start, int len) throws IOException {
+    out.write(bytes, start, len);
+  }
+
+  @Override
+  public void writeEnum(int e) throws IOException {
+    encodeLong(e, out);
+  }
+
+  @Override
+  public void writeArrayStart() throws IOException {
+  }
+
+  @Override
+  public void setItemCount(long itemCount) throws IOException {
+    if (itemCount > 0) {
+      writeLong(itemCount);
+    }
+  }
+  
+  @Override
+  public void startItem() throws IOException {
+  }
+
+  @Override
+  public void writeArrayEnd() throws IOException {
+    encodeLong(0, out);
+  }
+
+  @Override
+  public void writeMapStart() throws IOException {
+  }
+
+  @Override
+  public void writeMapEnd() throws IOException {
+    encodeLong(0, out);
+  }
+
+  @Override
+  public void writeIndex(int unionIndex) throws IOException {
+    encodeLong(unionIndex, out);
+  }
+  
+  protected static void encodeLong(long n, OutputStream o) throws IOException {
+    n = (n << 1) ^ (n >> 63); // move sign to low-order bit
+    while ((n & ~0x7F) != 0) {
+      o.write((byte)((n & 0x7f) | 0x80));
+      n >>>= 7;
+    }
+    o.write((byte)n);
+  }
+
+  protected static void encodeFloat(float f, OutputStream o) throws IOException {
+    long bits = Float.floatToRawIntBits(f);
+    o.write((int)(bits      ) & 0xFF);
+    o.write((int)(bits >>  8) & 0xFF);
+    o.write((int)(bits >> 16) & 0xFF);
+    o.write((int)(bits >> 24) & 0xFF);
+  }
+
+  protected static void encodeDouble(double d, OutputStream o) throws IOException {
+    long bits = Double.doubleToRawLongBits(d);
+    o.write((int)(bits      ) & 0xFF);
+    o.write((int)(bits >>  8) & 0xFF);
+    o.write((int)(bits >> 16) & 0xFF);
+    o.write((int)(bits >> 24) & 0xFF);
+    o.write((int)(bits >> 32) & 0xFF);
+    o.write((int)(bits >> 40) & 0xFF);
+    o.write((int)(bits >> 48) & 0xFF);
+    o.write((int)(bits >> 56) & 0xFF);
+  }
+
+}
+

Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/Perf.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/Perf.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/Perf.java (original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/Perf.java Fri Feb 25 00:36:40 2011
@@ -34,9 +34,6 @@ import org.apache.avro.generic.GenericDa
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
-import org.codehaus.jackson.JsonEncoding;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonGenerator;
 
 /**
  * Performance tests for various low level operations of
@@ -87,6 +84,8 @@ public class Perf {
     new TestDescriptor(BoolTest.class, "-b").add(BASIC);
     new TestDescriptor(BytesTest.class, "-by").add(BASIC);
     new TestDescriptor(StringTest.class, "-s").add(BASIC);
+    new TestDescriptor(ArrayTest.class, "-a").add(BASIC);
+    new TestDescriptor(MapTest.class, "-m").add(BASIC);
     BATCHES.put("-record", RECORD);
     new TestDescriptor(RecordTest.class, "-R").add(RECORD);
     new TestDescriptor(ValidatingRecord.class, "-Rv").add(RECORD);
@@ -97,6 +96,7 @@ public class Perf {
     BATCHES.put("-generic", GENERIC);
     new TestDescriptor(GenericTest.class, "-G").add(GENERIC);
     new TestDescriptor(GenericNested.class, "-Gn").add(GENERIC);
+    new TestDescriptor(GenericNestedFake.class, "-Gf").add(GENERIC);
     new TestDescriptor(GenericWithDefault.class, "-Gd").add(GENERIC);
     new TestDescriptor(GenericWithOutOfOrder.class, "-Go").add(GENERIC);
     new TestDescriptor(GenericWithPromotion.class, "-Gp").add(GENERIC);
@@ -171,10 +171,10 @@ public class Perf {
       try {
         // get everything to compile once
         t.init();
-        if (t.isReadTest() && readTests) {
+        if (t.isReadTest()) {
           t.readTest();
         }
-        if (t.isWriteTest() && writeTests) {
+        if (t.isWriteTest()) {
           t.writeTest();
         }
         t.reset();
@@ -191,12 +191,12 @@ public class Perf {
       // warmup JVM
       t.init();
       if (t.isReadTest() && readTests) {
-        for (int i = 0; i < t.cycles; i++) {
+        for (int i = 0; i < t.cycles/2; i++) {
           t.readTest();
         }
       }
       if (t.isWriteTest() && writeTests) {
-        for (int i = 0; i < t.cycles; i++) {
+        for (int i = 0; i < t.cycles/2; i++) {
           t.writeTest();
         }
       }
@@ -251,7 +251,8 @@ public class Perf {
     public long encodedSize = 0;
     protected boolean isReadTest = true;
     protected boolean isWriteTest = true;
-    static DecoderFactory factory = new DecoderFactory();
+    static DecoderFactory decoder_factory = new DecoderFactory();
+    static EncoderFactory encoder_factory = new EncoderFactory();
     
     public Test(String name, int cycles, int count) {
       this.name = name;
@@ -331,30 +332,39 @@ public class Perf {
     }
 
     protected Decoder newDecoder() {
-      return factory.createBinaryDecoder(data, null);
+      return decoder_factory.createBinaryDecoder(data, null);
     }
     
     protected Encoder newEncoder() {
       OutputStream out = new ByteArrayOutputStream((int)(encodedSize > 0 ? encodedSize : count));
-      return new BinaryEncoder(out);
+      return newEncoder(out);
+    }
+    // switch out what is returned to test different encoders.
+    private Encoder newEncoder(OutputStream out) {
+      Encoder e = encoder_factory.binaryEncoder(out, null);
+//      Encoder e = encoder_factory.directBinaryEncoder(out, null);
+//      Encoder e = encoder_factory.blockingBinaryEncoder(out, null);
+//      Encoder e = new LegacyBinaryEncoder(out);
+      return e;
     }
     
     @Override
     void init() throws IOException {
       genSourceData();
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      Encoder e = new BinaryEncoder(baos);
+      Encoder e = newEncoder(baos);
       writeInternal(e);
       e.flush();
       data = baos.toByteArray();
       encodedSize = data.length;
+      //System.out.println(this.getClass().getSimpleName() + " encodedSize=" + encodedSize);
     }
 
     abstract void genSourceData();
     abstract void readInternal(Decoder d) throws IOException;
     abstract void writeInternal(Encoder e) throws IOException;
   }
-    
+  
   static class IntTest extends BasicTest {
     protected int[] sourceData = null;
     public IntTest() throws IOException {
@@ -448,6 +458,10 @@ public class Perf {
         sourceData[i+2] = r.nextLong() % 0x3FFFFFFFFL; // half in <=5, half in 6
         sourceData[i+3] = r.nextLong() % 0x1FFFFFFFFFFFFL; // half in <=8, half in 9 
       }
+      // last 16, make full size
+      for (int i = sourceData.length - 16; i < sourceData.length; i ++) {
+        sourceData[i] = r.nextLong();
+      }
     }
    
     @Override
@@ -480,7 +494,10 @@ public class Perf {
   static class FloatTest extends BasicTest {
     float[] sourceData = null;
     public FloatTest() throws IOException {
-      super("Float", "{ \"type\": \"float\"} ");
+      this("Float", "{ \"type\": \"float\"} ");
+    }
+    public FloatTest(String name, String schema) throws IOException {
+      super(name, schema);
     }
 
     @Override
@@ -695,6 +712,100 @@ public class Perf {
     }
   }
   
+  static class ArrayTest extends FloatTest {
+    public ArrayTest() throws IOException {
+      super("Array",
+          "{ \"type\": \"array\", \"items\": " +
+          " { \"type\": \"record\", \"name\":\"Foo\", \"fields\": " +
+          "  [{\"name\":\"bar\", \"type\":" +
+          "    {\"type\": \"array\", \"items\": " +
+          "     { \"type\": \"record\", \"name\":\"Vals\", \"fields\": [" +
+          "      {\"name\":\"f1\", \"type\":\"float\"}," +
+          "      {\"name\":\"f2\", \"type\":\"float\"}," +
+          "      {\"name\":\"f3\", \"type\":\"float\"}," +
+          "      {\"name\":\"f4\", \"type\":\"float\"}]" +
+          "     }" +
+          "    }" +
+          "   }]}}");
+    }
+   
+    @Override
+    void readInternal(Decoder d) throws IOException {
+      d.readArrayStart();
+      for(long i = d.readArrayStart(); i != 0; i = d.arrayNext()) {
+        for (long j = 0; j < i; j++) {
+          d.readFloat();
+          d.readFloat();
+          d.readFloat();
+          d.readFloat();
+        }
+      }
+      d.arrayNext();
+    }
+
+    @Override
+    void writeInternal(Encoder e) throws IOException {
+      int items = sourceData.length/4;
+      e.writeArrayStart();
+      e.setItemCount(1);
+      e.startItem();
+      e.writeArrayStart();
+      e.setItemCount(items);
+      for (int i = 0; i < sourceData.length;i+=4) {
+        e.startItem();
+        e.writeFloat(sourceData[i]);
+        e.writeFloat(sourceData[i+1]);
+        e.writeFloat(sourceData[i+2]);
+        e.writeFloat(sourceData[i+3]);
+      }
+      e.writeArrayEnd();
+      e.writeArrayEnd();
+    }
+  }
+  
+  static class MapTest extends FloatTest {
+    public MapTest() throws IOException {
+      super("Map", "{ \"type\": \"map\", \"values\": " +
+      		"  { \"type\": \"record\", \"name\":\"Vals\", \"fields\": [" +
+          "   {\"name\":\"f1\", \"type\":\"float\"}," +
+          "   {\"name\":\"f2\", \"type\":\"float\"}," +
+          "   {\"name\":\"f3\", \"type\":\"float\"}," +
+          "   {\"name\":\"f4\", \"type\":\"float\"}]" +
+          "  }} ");
+    }
+   
+    @Override
+    void readInternal(Decoder d) throws IOException {
+      Utf8 key = new Utf8();
+      for(long i = d.readMapStart(); i != 0; i = d.mapNext()) {
+        for (long j = 0; j < i; j++) {
+          key = d.readString(key);
+          d.readFloat();
+          d.readFloat();
+          d.readFloat();
+          d.readFloat();
+        }
+      }
+    }
+
+    @Override
+    void writeInternal(Encoder e) throws IOException {
+      int items = sourceData.length/4;
+      e.writeMapStart();
+      e.setItemCount(items);
+      Utf8 foo = new Utf8("foo");
+      for (int i = 0, j = 0; i < sourceData.length;i+=4, j++) {
+        e.startItem();
+        e.writeString(foo);
+        e.writeFloat(sourceData[i]);
+        e.writeFloat(sourceData[i+1]);
+        e.writeFloat(sourceData[i+2]);
+        e.writeFloat(sourceData[i+3]);
+      }
+      e.writeMapEnd();
+    }
+  }
+  
   private static final String RECORD_SCHEMA = 
     "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [\n"
     + "{ \"name\": \"f1\", \"type\": \"double\" },\n"
@@ -791,7 +902,7 @@ public class Perf {
     }
     @Override
     protected Encoder getEncoder() throws IOException {
-      return new ValidatingEncoder(schema, super.getEncoder());  
+      return encoder_factory.validatingEncoder(schema, super.getEncoder());  
     }
   }
   
@@ -1020,27 +1131,86 @@ public class Perf {
     }
     @Override
     void genSourceData() {
-      Random r = newRandom();
-      sourceData = new GenericRecord[count];
+      sourceData = generateGenericNested(schema, count);
+    }
+  }
+  static GenericRecord[] generateGenericNested(Schema schema, int count) {
+    Random r = newRandom();
+    GenericRecord[] sourceData = new GenericRecord[count];
+    Schema doubleSchema = schema.getFields().get(0).schema();
+    for (int i = 0; i < sourceData.length; i++) {
+      GenericRecord rec = new GenericData.Record(schema);
+      GenericRecord inner;
+      inner = new GenericData.Record(doubleSchema);
+      inner.put(0, r.nextDouble());
+      rec.put(0, inner);
+      inner = new GenericData.Record(doubleSchema);
+      inner.put(0, r.nextDouble());
+      rec.put(1, inner);
+      inner = new GenericData.Record(doubleSchema);
+      inner.put(0, r.nextDouble());
+      rec.put(2, inner);
+      rec.put(3, r.nextInt());
+      rec.put(4, r.nextInt());
+      rec.put(5, r.nextInt());
+      sourceData[i] = rec; 
+    }
+    return sourceData;
+  }
+  
+  static class GenericNestedFake extends BasicTest {
+    //reads and writes generic data, but not using
+    //GenericDatumReader or GenericDatumWriter
+    GenericRecord[] sourceData = null;
+    public GenericNestedFake() throws IOException {
+      super("GenericNestedFake_", NESTED_RECORD_SCHEMA, 12);
+    }
+    @Override
+    void readInternal(Decoder d) throws IOException {
       Schema doubleSchema = schema.getFields().get(0).schema();
-      for (int i = 0; i < sourceData.length; i++) {
+      for (int i = 0; i < count; i++) {
         GenericRecord rec = new GenericData.Record(schema);
         GenericRecord inner;
         inner = new GenericData.Record(doubleSchema);
-        inner.put(0, r.nextDouble());
+        inner.put(0, d.readDouble());
         rec.put(0, inner);
         inner = new GenericData.Record(doubleSchema);
-        inner.put(0, r.nextDouble());
+        inner.put(0, d.readDouble());
         rec.put(1, inner);
         inner = new GenericData.Record(doubleSchema);
-        inner.put(0, r.nextDouble());
+        inner.put(0, d.readDouble());
         rec.put(2, inner);
-        rec.put(3, r.nextInt());
-        rec.put(4, r.nextInt());
-        rec.put(5, r.nextInt());
-        sourceData[i] = rec; 
+        rec.put(3, d.readInt());
+        rec.put(4, d.readInt());
+        rec.put(5, d.readInt());
+      }
+    }
+    @Override
+    void writeInternal(Encoder e) throws IOException {
+      for (int i = 0; i < sourceData.length; i++) {
+        GenericRecord rec = sourceData[i];
+        GenericRecord inner;
+        inner = (GenericRecord)rec.get(0);
+        e.writeDouble((Double)inner.get(0));
+        inner = (GenericRecord)rec.get(1);
+        e.writeDouble((Double)inner.get(0));
+        inner = (GenericRecord)rec.get(2);
+        e.writeDouble((Double)inner.get(0));
+        e.writeInt((Integer)rec.get(3));
+        e.writeInt((Integer)rec.get(4));
+        e.writeInt((Integer)rec.get(5));
       }
     }
+    @Override
+    void genSourceData() {
+      sourceData = generateGenericNested(schema, count);
+    }
+    @Override
+    void reset() {
+      data = null;
+      sourceData = null;
+    }
+    
   }
 
   private static abstract class GenericResolving extends GenericTest {

Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java (original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java Fri Feb 25 00:36:40 2011
@@ -46,6 +46,7 @@ public class TestBinaryDecoder {
   // prime number buffer size so that looping tests hit the buffer edge
   // at different points in the loop.
   DecoderFactory factory;
+  static EncoderFactory e_factory = EncoderFactory.get();
   public TestBinaryDecoder(boolean useDirect) {
     factory = new DecoderFactory().configureDecoderBufferSize(521);
     factory.configureDirectDecoder(useDirect);
@@ -130,11 +131,11 @@ public class TestBinaryDecoder {
     ByteBufferOutputStream bbo2 = new ByteBufferOutputStream();
     byte[] b1 = new byte[] { 1, 2 };
     
-    BinaryEncoder e1 = new BinaryEncoder(bbo1);
+    BinaryEncoder e1 = e_factory.binaryEncoder(bbo1, null);
     e1.writeBytes(b1);
     e1.flush();
     
-    BinaryEncoder e2 = new BinaryEncoder(bbo2);
+    BinaryEncoder e2 = e_factory.binaryEncoder(bbo2, null);
     e2.writeBytes(b1);
     e2.flush();
     
@@ -173,12 +174,13 @@ public class TestBinaryDecoder {
     GenericDatumWriter<Object> writer = new GenericDatumWriter<Object>();
     writer.setSchema(schema);
     ByteArrayOutputStream baos = new ByteArrayOutputStream(8192);
-    BinaryEncoder encoder = new BinaryEncoder(baos);
+    BinaryEncoder encoder = e_factory.binaryEncoder(baos, null);
     
     for (Object datum : new RandomData(schema, count, seed)) {
       writer.write(datum, encoder);
       records.add(datum);
     }
+    encoder.flush();
     data = baos.toByteArray();
   }
 

Added: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java?rev=1074364&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java (added)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java Fri Feb 25 00:36:40 2011
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.avro.util.Utf8;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBinaryEncoderFidelity {
+  
+  static byte[] legacydata;
+  static byte[] complexdata;
+  EncoderFactory factory = EncoderFactory.get();
+  public static void generateData(Encoder e) throws IOException {
+    // generate a bunch of data that should test the bounds of a BinaryEncoder
+    Random r = new Random(665321);
+    e.writeNull();
+    e.writeBoolean(true);
+    e.writeBoolean(false);
+    byte[] bytes = new byte[10];
+    r.nextBytes(bytes);
+    e.writeBytes(bytes);
+    e.writeBytes(new byte[0]);
+    e.writeBytes(bytes, 3, 3);
+    e.writeBytes(new byte[0], 0, 0);
+    e.writeBytes(ByteBuffer.wrap(bytes, 2, 2));
+    e.writeDouble(0.0);
+    e.writeDouble(-0.0);
+    e.writeDouble(Double.NaN);
+    e.writeDouble(r.nextDouble());
+    e.writeDouble(Double.NEGATIVE_INFINITY);
+    e.writeEnum(65);
+    e.writeFixed(bytes);
+    e.writeFixed(bytes, 7, 2);
+    e.writeFloat(1.0f);
+    e.writeFloat(r.nextFloat());
+    e.writeFloat(Float.POSITIVE_INFINITY);
+    e.writeFloat(Float.MIN_NORMAL);
+    e.writeIndex(-2);
+    e.writeInt(0);
+    e.writeInt(-1);
+    e.writeInt(1);
+    e.writeInt(0x40);
+    e.writeInt(-0x41);
+    e.writeInt(0x2000);
+    e.writeInt(-0x2001);
+    e.writeInt(0x80000);
+    e.writeInt(-0x80001);
+    e.writeInt(0x4000000);
+    e.writeInt(-0x4000001);
+    e.writeInt(r.nextInt());
+    e.writeInt(r.nextInt());
+    e.writeInt(Integer.MAX_VALUE);
+    e.writeInt(Integer.MIN_VALUE);
+    e.writeLong(0);
+    e.writeLong(-1);
+    e.writeLong(1);
+    e.writeLong(0x40);
+    e.writeLong(-0x41);
+    e.writeLong(0x2000);
+    e.writeLong(-0x2001);
+    e.writeLong(0x80000);
+    e.writeLong(-0x80001);
+    e.writeLong(0x4000000);
+    e.writeLong(-0x4000001);
+    e.writeLong(0x200000000L);
+    e.writeLong(-0x200000001L);
+    e.writeLong(0x10000000000L);
+    e.writeLong(-0x10000000001L);
+    e.writeLong(0x800000000000L);
+    e.writeLong(-0x800000000001L);
+    e.writeLong(0x40000000000000L);
+    e.writeLong(-0x40000000000001L);
+    e.writeLong(0x2000000000000000L);
+    e.writeLong(-0x2000000000000001L);
+    e.writeLong(r.nextLong());
+    e.writeLong(r.nextLong());
+    e.writeLong(Long.MAX_VALUE);
+    e.writeLong(Long.MIN_VALUE);
+    e.writeString(new StringBuilder("StringBuilder\u00A2"));
+    e.writeString("String\u20AC");
+    e.writeString("");
+    e.writeString(new Utf8("Utf8\uD834\uDD1E"));
+    if (e instanceof BinaryEncoder) {
+      int count = ((BinaryEncoder)e).bytesBuffered();
+      System.out.println(e.getClass().getSimpleName() + " buffered: " + count);
+    }
+    e.flush();
+  }
+  
+  static void generateComplexData(Encoder e) throws IOException {
+    e.writeArrayStart();
+    e.setItemCount(1);
+    e.startItem();
+    e.writeInt(1);
+    e.writeArrayEnd();
+    e.writeMapStart();
+    e.setItemCount(2);
+    e.startItem();
+    e.writeString("foo");
+    e.writeInt(-1);
+    e.writeDouble(33.3);
+    e.startItem();
+    e.writeString("bar");
+    e.writeInt(1);
+    e.writeDouble(-33.3);
+    e.writeMapEnd();
+    e.flush();
+  }
+  
+  @BeforeClass
+  public static void generateLegacyData() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    Encoder e = new LegacyBinaryEncoder(baos);
+    generateData(e);
+    legacydata = baos.toByteArray();
+    baos.reset();
+    generateComplexData(e);
+    complexdata = baos.toByteArray();
+  }
+  
+  @Test
+  public void testBinaryEncoder() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    BinaryEncoder e = factory.binaryEncoder(baos, null);
+    generateData(e);
+    byte[] result = baos.toByteArray();
+    Assert.assertEquals(legacydata.length, result.length);
+    Assert.assertArrayEquals(legacydata, result);
+    baos.reset();
+    generateComplexData(e);
+    byte[] result2 = baos.toByteArray();
+    Assert.assertEquals(complexdata.length, result2.length);
+    Assert.assertArrayEquals(complexdata, result2);
+  }
+  
+  @Test
+  public void testDirectBinaryEncoder() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    BinaryEncoder e = factory.directBinaryEncoder(baos, null);
+    generateData(e);
+    byte[] result = baos.toByteArray();
+    Assert.assertEquals(legacydata.length, result.length);
+    Assert.assertArrayEquals(legacydata, result);
+    baos.reset();
+    generateComplexData(e);
+    byte[] result2 = baos.toByteArray();
+    Assert.assertEquals(complexdata.length, result2.length);
+    Assert.assertArrayEquals(complexdata, result2);
+  }
+
+  
+  @Test
+  public void testBlockingBinaryEncoder() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    BinaryEncoder e = factory.blockingBinaryEncoder(baos, null);
+    generateData(e);
+    byte[] result = baos.toByteArray();
+    Assert.assertEquals(legacydata.length, result.length);
+    Assert.assertArrayEquals(legacydata, result);
+    baos.reset();
+    generateComplexData(e);
+    byte[] result2 = baos.toByteArray();
+    // blocking will cause different length, should be two bytes larger
+    Assert.assertEquals(complexdata.length + 2, result2.length);
+    // the first byte is the array start, with the count of items negative
+    Assert.assertEquals(complexdata[0] >>> 1, result2[0]);
+  }
+}

Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingIO.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingIO.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingIO.java (original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingIO.java Fri Feb 25 00:36:40 2011
@@ -62,7 +62,9 @@ public class TestBlockingIO {
           new ByteArrayInputStream(input.getBytes("UTF-8")));
       
       ByteArrayOutputStream os = new ByteArrayOutputStream();
-      Encoder cos = new BlockingBinaryEncoder(os, bufferSize);
+      EncoderFactory factory = new EncoderFactory()
+          .configureBlockSize(bufferSize);
+      Encoder cos = factory.blockingBinaryEncoder(os, null);
       serialize(cos, p, os);
       cos.flush();
       

Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingIO2.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingIO2.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingIO2.java (original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingIO2.java Fri Feb 25 00:36:40 2011
@@ -43,7 +43,9 @@ public class TestBlockingIO2 {
     throws IOException {
 
     ByteArrayOutputStream os = new ByteArrayOutputStream();
-    Encoder encoder = new BlockingBinaryEncoder(os, bufferSize);
+    EncoderFactory factory = new EncoderFactory()
+        .configureBlockSize(bufferSize);
+    Encoder encoder = factory.blockingBinaryEncoder(os, null);
     this.values = TestValidatingIO.randomValues(calls);
 
     TestValidatingIO.generate(encoder, calls, values);

Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java (original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java Fri Feb 25 00:36:40 2011
@@ -22,34 +22,80 @@ import java.io.IOException;
 import java.io.OutputStream;
 
 import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestEncoders {
+  private static EncoderFactory factory = EncoderFactory.get();
+
   @Test
   public void testBinaryEncoderInit() throws IOException {
-    OutputStream out = null;
-    new BinaryEncoder(out).init(new ByteArrayOutputStream());
+    OutputStream out = new ByteArrayOutputStream();
+    BinaryEncoder enc = factory.binaryEncoder(out, null);
+    Assert.assertTrue(enc == factory.binaryEncoder(out, enc));
+  }
+  
+  @Test(expected=NullPointerException.class)
+  public void testBadBinaryEncoderInit() {
+    factory.binaryEncoder(null, null);
   }
 
   @Test
   public void testBlockingBinaryEncoderInit() throws IOException {
-    OutputStream out = null;
-    new BlockingBinaryEncoder(out).init(new ByteArrayOutputStream());
+    OutputStream out = new ByteArrayOutputStream();
+    BinaryEncoder reuse = null;
+    reuse = factory.blockingBinaryEncoder(out, reuse);
+    Assert.assertTrue(reuse == factory.blockingBinaryEncoder(out, reuse));
+    // comparison 
+  }
+  
+  @Test(expected=NullPointerException.class)
+  public void testBadBlockintBinaryEncoderInit() {
+    factory.binaryEncoder(null, null);
+  }
+  
+  @Test
+  public void testDirectBinaryEncoderInit() throws IOException {
+    OutputStream out = new ByteArrayOutputStream();
+    BinaryEncoder enc = factory.directBinaryEncoder(out, null);
+    Assert.assertTrue(enc ==  factory.directBinaryEncoder(out, enc));
+  }
+  
+  @Test(expected=NullPointerException.class)
+  public void testBadDirectBinaryEncoderInit() {
+    factory.directBinaryEncoder(null, null);
   }
 
   @Test
   public void testJsonEncoderInit() throws IOException {
     Schema s = Schema.parse("\"int\"");
-    OutputStream out = null;
-    new JsonEncoder(s, out).init(new ByteArrayOutputStream());
+    OutputStream out = new ByteArrayOutputStream();
+    factory.jsonEncoder(s, out);
+    JsonEncoder enc = factory.jsonEncoder(s,
+        new JsonFactory().createJsonGenerator(out, JsonEncoding.UTF8));
+    enc.configure(out);
+  }
+  
+  @Test(expected=NullPointerException.class)
+  public void testBadJsonEncoderInitOS() throws IOException {
+    factory.jsonEncoder(Schema.create(Type.INT), (OutputStream)null);
+  }
+  
+  @Test(expected=NullPointerException.class)
+  public void testBadJsonEncoderInit() throws IOException {
+    factory.jsonEncoder(Schema.create(Type.INT), (JsonGenerator)null);
   }
 
   @Test
   public void testValidatingEncoderInit() throws IOException {
     Schema s = Schema.parse("\"int\"");
-    OutputStream out = null;
-    Encoder e = new BinaryEncoder(out);
-    new ValidatingEncoder(s, e).init(new ByteArrayOutputStream());
+    OutputStream out = new ByteArrayOutputStream();
+    Encoder e = factory.directBinaryEncoder(out, null);
+    factory.validatingEncoder(s, e).configure(e);
   }
 
 }

Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestValidatingIO.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestValidatingIO.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestValidatingIO.java (original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestValidatingIO.java Fri Feb 25 00:36:40 2011
@@ -77,21 +77,22 @@ public class TestValidatingIO {
 
   public static byte[] make(Schema sc, String calls,
       Object[] values, Encoding encoding) throws IOException {
+    EncoderFactory factory = EncoderFactory.get();
     ByteArrayOutputStream ba = new ByteArrayOutputStream();
     Encoder bvo = null;
     switch (encoding) {
     case BINARY:
-      bvo = new BinaryEncoder(ba);
+      bvo = factory.binaryEncoder(ba, null);
       break;
     case BLOCKING_BINARY:
-      bvo = new BlockingBinaryEncoder(ba);
+      bvo = factory.blockingBinaryEncoder(ba, null);
       break;
     case JSON:
-      bvo = new JsonEncoder(sc, ba);
+      bvo = factory.jsonEncoder(sc, ba);
       break;
     }
         
-    Encoder vo = new ValidatingEncoder(sc, bvo);
+    Encoder vo = factory.validatingEncoder(sc, bvo);
     generate(vo, calls, values);
     vo.flush();
     return ba.toByteArray();

Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/parsing/TestResolvingGrammarGenerator.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/parsing/TestResolvingGrammarGenerator.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/parsing/TestResolvingGrammarGenerator.java (original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/parsing/TestResolvingGrammarGenerator.java Fri Feb 25 00:36:40 2011
@@ -24,9 +24,8 @@ import java.util.Arrays;
 import java.util.Collection;
 
 import org.apache.avro.Schema;
-import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.Encoder;
-import org.apache.avro.io.ValidatingEncoder;
+import org.apache.avro.io.EncoderFactory;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -51,9 +50,12 @@ public class TestResolvingGrammarGenerat
   @Test
   public void test() throws IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    Encoder e = new ValidatingEncoder(schema, new BinaryEncoder(baos));
+    EncoderFactory factory = EncoderFactory.get();
+    Encoder e = factory.validatingEncoder(schema, 
+        factory.binaryEncoder(baos, null));
     
     ResolvingGrammarGenerator.encode(e, schema, data);
+    e.flush();
   }
   
   @Parameterized.Parameters

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java Fri Feb 25 00:36:40 2011
@@ -37,6 +37,7 @@ import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.ByteBufferInputStream;
@@ -83,6 +84,10 @@ public abstract class Requestor {
     rpcMetaPlugins.add(plugin);
   }
 
+  private static final EncoderFactory ENCODER_FACTORY = new EncoderFactory();
+  private BinaryEncoder encoder = 
+    ENCODER_FACTORY.binaryEncoder(new ByteBufferOutputStream(), null);
+  
   /** Writes a request message and reads a response or error message. */
   public synchronized Object request(String messageName, Object request)
     throws Exception {
@@ -92,7 +97,8 @@ public abstract class Requestor {
     RPCContext context = new RPCContext();
     do {
       ByteBufferOutputStream bbo = new ByteBufferOutputStream();
-      Encoder out = new BinaryEncoder(bbo);
+      //safe to use encoder because this is synchronized
+      BinaryEncoder out = ENCODER_FACTORY.binaryEncoder(bbo, encoder);
       
       // use local protocol to write request
       m = getLocal().getMessages().get(messageName);
@@ -101,6 +107,8 @@ public abstract class Requestor {
       context.setMessage(m);
     
       writeRequest(m.getRequest(), request, out); // write request payload
+      
+      out.flush();
       List<ByteBuffer> payload = bbo.getBufferList();
       
       writeHandshake(out);                       // prepend handshake if needed
@@ -113,6 +121,7 @@ public abstract class Requestor {
 
       out.writeString(m.getName());               // write message name
 
+      out.flush();
       bbo.append(payload);
       
       List<ByteBuffer> requestBytes = bbo.getBufferList();
@@ -247,9 +256,10 @@ public abstract class Requestor {
     if (remote != null) return remote;            // already cached
     // force handshake
     ByteBufferOutputStream bbo = new ByteBufferOutputStream();
-    Encoder out = new BinaryEncoder(bbo);
+    // direct because the payload is tiny.
+    Encoder out = ENCODER_FACTORY.directBinaryEncoder(bbo, null);
     writeHandshake(out);
-    out.writeLong(0);                             // empty metadata
+    out.writeInt(0);                              // empty metadata
     out.writeString("");                          // bogus message name
     List<ByteBuffer> response =
       getTransceiver().transceive(bbo.getBufferList());

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java Fri Feb 25 00:36:40 2011
@@ -44,6 +44,7 @@ import org.apache.avro.io.DecoderFactory
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 
@@ -98,7 +99,7 @@ public abstract class Responder {
   public List<ByteBuffer> respond(List<ByteBuffer> buffers) throws IOException {
     return respond(buffers, null);
   }
-
+  
   /** Called by a server to deserialize a request, compute and serialize a
    * response or error.  Transciever is used by connection-based servers to
    * track handshake status of connection. */
@@ -107,7 +108,7 @@ public abstract class Responder {
     Decoder in = DecoderFactory.defaultFactory().createBinaryDecoder(
         new ByteBufferInputStream(buffers), null);
     ByteBufferOutputStream bbo = new ByteBufferOutputStream();
-    BinaryEncoder out = new BinaryEncoder(bbo);
+    BinaryEncoder out = EncoderFactory.get().binaryEncoder(bbo, null);
     Exception error = null;
     RPCContext context = new RPCContext();
     List<ByteBuffer> payload = null;
@@ -115,6 +116,7 @@ public abstract class Responder {
     boolean wasConnected = connection != null && connection.isConnected();
     try {
       Protocol remote = handshake(in, out, connection);
+      out.flush();
       if (remote == null)                        // handshake failed
         return bbo.getBufferList();
       handshake = bbo.getBufferList();
@@ -171,11 +173,14 @@ public abstract class Responder {
       LOG.warn("system error", e);
       context.setError(e);
       bbo = new ByteBufferOutputStream();
-      out = new BinaryEncoder(bbo);
+      out = EncoderFactory.get().binaryEncoder(bbo, null);
       out.writeBoolean(true);
       writeError(Protocol.SYSTEM_ERRORS, new Utf8(e.toString()), out);
+      if (null == handshake) {
+        handshake = new ByteBufferOutputStream().getBufferList();
+      }
     }
-
+    out.flush();
     payload = bbo.getBufferList();
     
     // Grab meta-data from plugins
@@ -184,7 +189,7 @@ public abstract class Responder {
       plugin.serverSendResponse(context);
     }
     META_WRITER.write(context.responseCallMeta(), out);
-    
+    out.flush();
     // Prepend handshake and append payload
     bbo.prepend(handshake);
     bbo.append(payload);

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestCompare.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestCompare.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestCompare.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestCompare.java Fri Feb 25 00:36:40 2011
@@ -31,7 +31,8 @@ import org.apache.avro.specific.Specific
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.io.BinaryData;
 import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.util.Utf8;
 
 import org.apache.avro.test.TestRecord;
@@ -244,7 +245,7 @@ public class TestCompare {
   private static int compare(Object o1, Object o2, Schema schema,
                              boolean comparable, GenericData comparator) {
     return comparable
-      ? ((Comparable)o1).compareTo(o2)
+      ? ((Comparable<Object>)o1).compareTo(o2)
       : comparator.compare(o1, o2, schema);
   }
 
@@ -253,7 +254,9 @@ public class TestCompare {
     throws IOException {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     writer.setSchema(schema);
-    writer.write(datum, new BinaryEncoder(out));
+    Encoder enc = new EncoderFactory().directBinaryEncoder(out, null);
+    writer.write(datum, enc);
+    enc.flush();
     return out.toByteArray();
   }
 }

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java Fri Feb 25 00:36:40 2011
@@ -44,9 +44,8 @@ import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.Encoder;
-import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.io.JsonDecoder;
-import org.apache.avro.io.JsonEncoder;
 import org.apache.avro.compiler.specific.TestSpecificCompiler;
 import org.apache.avro.util.Utf8;
 
@@ -558,6 +557,12 @@ public class TestSchema {
       checkBinary(schema, datum,
                   new GenericDatumWriter<Object>(),
                   new GenericDatumReader<Object>());
+      checkDirectBinary(schema, datum,
+                  new GenericDatumWriter<Object>(),
+                  new GenericDatumReader<Object>());
+      checkBlockingBinary(schema, datum,
+                  new GenericDatumWriter<Object>(),
+                  new GenericDatumReader<Object>());
       checkJson(schema, datum,
                   new GenericDatumWriter<Object>(),
                   new GenericDatumReader<Object>());
@@ -579,14 +584,16 @@ public class TestSchema {
     assertEquals(s1, s2);
     assertFalse(s0.equals(s2));
   }
-
+  
   public static void checkBinary(Schema schema, Object datum,
                                  DatumWriter<Object> writer,
                                  DatumReader<Object> reader)
     throws IOException {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     writer.setSchema(schema);
-    writer.write(datum, new BinaryEncoder(out));
+    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+    writer.write(datum, encoder);
+    encoder.flush();
     byte[] data = out.toByteArray();
 
     reader.setSchema(schema);
@@ -598,12 +605,48 @@ public class TestSchema {
     assertEquals("Decoded data does not match.", datum, decoded);
   }
 
+  public static void checkDirectBinary(Schema schema, Object datum,
+      DatumWriter<Object> writer, DatumReader<Object> reader)
+      throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    writer.setSchema(schema);
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
+    writer.write(datum, encoder);
+    // no flush for direct
+    byte[] data = out.toByteArray();
+
+    reader.setSchema(schema);
+
+    Object decoded = reader.read(null, DecoderFactory.defaultFactory()
+        .createBinaryDecoder(data, null));
+
+    assertEquals("Decoded data does not match.", datum, decoded);
+  }
+
+  public static void checkBlockingBinary(Schema schema, Object datum,
+      DatumWriter<Object> writer, DatumReader<Object> reader)
+      throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    writer.setSchema(schema);
+    Encoder encoder = EncoderFactory.get().blockingBinaryEncoder(out, null);
+    writer.write(datum, encoder);
+    encoder.flush();
+    byte[] data = out.toByteArray();
+
+    reader.setSchema(schema);
+
+    Object decoded = reader.read(null, DecoderFactory.defaultFactory()
+        .createBinaryDecoder(data, null));
+
+    assertEquals("Decoded data does not match.", datum, decoded);
+  }
+
   private static void checkJson(Schema schema, Object datum,
                                 DatumWriter<Object> writer,
                                 DatumReader<Object> reader)
     throws IOException {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
-    Encoder encoder = new JsonEncoder(schema, out);
+    Encoder encoder = EncoderFactory.get().jsonEncoder(schema, out);
     writer.setSchema(schema);
     writer.write(datum, encoder);
     writer.write(datum, encoder);
@@ -622,7 +665,7 @@ public class TestSchema {
   private static void checkJson(Schema schema, Object datum,
                                 String json) throws Exception {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
-    Encoder encoder = new JsonEncoder(schema, out);
+    Encoder encoder = EncoderFactory.get().jsonEncoder(schema, out);
     DatumWriter<Object> writer = new GenericDatumWriter<Object>();
     writer.setSchema(schema);
     writer.write(datum, encoder);
@@ -643,7 +686,6 @@ public class TestSchema {
   private static final Schema ACTUAL =            // an empty record schema
     Schema.parse("{\"type\":\"record\", \"name\":\"Foo\", \"fields\":[]}");
 
-  @SuppressWarnings(value="unchecked")
   private static void checkDefault(String schemaJson, String defaultJson,
                                    Object defaultValue) throws Exception {
     String recordJson =
@@ -651,7 +693,7 @@ public class TestSchema {
     +"\"type\":"+schemaJson+", "
     +"\"default\":"+defaultJson+"}]}";
     Schema expected = Schema.parse(recordJson);
-    DatumReader in = new GenericDatumReader(ACTUAL, expected);
+    DatumReader<Object> in = new GenericDatumReader<Object>(ACTUAL, expected);
     GenericData.Record record = (GenericData.Record)
       in.read(null, DecoderFactory.defaultFactory().createBinaryDecoder(
           new byte[0], null));
@@ -677,9 +719,10 @@ public class TestSchema {
       ("{\"type\":\"enum\",\"name\":\"E\",\"symbols\":[\"Y\",\"Z\"]}");
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     DatumWriter<Object> writer = new GenericDatumWriter<Object>(actual);
-    Encoder encoder = new BinaryEncoder(out);
+    Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
     writer.write(new GenericData.EnumSymbol(actual, "Y"), encoder);
     writer.write(new GenericData.EnumSymbol(actual, "X"), encoder);
+    encoder.flush();
     byte[] data = out.toByteArray();
     Decoder decoder = DecoderFactory.defaultFactory().createBinaryDecoder(
         data, null);

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificData.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificData.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificData.java Fri Feb 25 00:36:40 2011
@@ -70,7 +70,14 @@ public class TestSpecificData {
                            new SpecificDatumWriter<Object>(),
                            new SpecificDatumReader<Object>());
 
-  }
+    TestSchema.checkDirectBinary(schema, record,
+        new SpecificDatumWriter<Object>(),
+        new SpecificDatumReader<Object>());
+
+    TestSchema.checkBlockingBinary(schema, record,
+        new SpecificDatumWriter<Object>(),
+        new SpecificDatumReader<Object>());
+}
 
 
 

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificDatumWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificDatumWriter.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificDatumWriter.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificDatumWriter.java Fri Feb 25 00:36:40 2011
@@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
 import org.apache.avro.Schema;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.io.JsonEncoder;
 import org.apache.avro.test.Kind;
 import org.apache.avro.test.TestRecordWithUnion;
@@ -34,7 +35,7 @@ public class TestSpecificDatumWriter {
     final SpecificDatumWriter<TestRecordWithUnion> writer = new SpecificDatumWriter<TestRecordWithUnion>();
     Schema schema = TestRecordWithUnion.SCHEMA$;
     ByteArrayOutputStream out = new ByteArrayOutputStream();
-    JsonEncoder encoder = new JsonEncoder(schema, out);
+    JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, out);
 
     writer.setSchema(schema);
 

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java Fri Feb 25 00:36:40 2011
@@ -33,6 +33,7 @@ import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
@@ -123,11 +124,16 @@ public class AvroSerialization<T> extend
 
     public void open(OutputStream out) {
       this.out = out;
-      this.encoder = new BinaryEncoder(out);
+      this.encoder = new EncoderFactory().configureBlockSize(512)
+          .binaryEncoder(out, null);
     }
 
     public void serialize(AvroWrapper<T> wrapper) throws IOException {
       writer.write(wrapper.datum(), encoder);
+      // would be a lot faster if the Serializer interface had a flush()
+      // method and the Hadoop framework called it when needed rather
+      // than for every record.
+      encoder.flush();
     }
 
     public void close() throws IOException {

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java Fri Feb 25 00:36:40 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configured
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
 
 /** A {@link Serialization} for {@link TetherData}. */
 class TetherKeySerialization
@@ -75,7 +76,7 @@ class TetherKeySerialization
     
     public void open(OutputStream out) {
       this.out = out;
-      this.encoder = new BinaryEncoder(out);
+      this.encoder = EncoderFactory.get().binaryEncoder(out, null);
     }
 
     public void serialize(TetherData datum) throws IOException {
@@ -83,6 +84,7 @@ class TetherKeySerialization
     }
 
     public void close() throws IOException {
+      encoder.flush();
       out.close();
     }
 

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestReflectJob.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestReflectJob.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestReflectJob.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestReflectJob.java Fri Feb 25 00:36:40 2011
@@ -99,7 +99,7 @@ public class TestReflectJob {
   @SuppressWarnings("deprecation")
   public void testJob() throws Exception {
     JobConf job = new JobConf();
-    String dir = System.getProperty("test.dir", ".") + "/mapred-reflect";
+    String dir = System.getProperty("test.dir", ".") + "target/testReflectJob";
     Path inputPath = new Path(dir + "/in");
     Path outputPath = new Path(dir + "/out");
 

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTask.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTask.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTask.java (original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTask.java Fri Feb 25 00:36:40 2011
@@ -34,6 +34,7 @@ import org.apache.avro.ipc.specific.Spec
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 
@@ -71,7 +72,8 @@ public abstract class TetherTask<IN,MID,
   public class Collector<T> {
     private SpecificDatumWriter<T> writer;
     private Buffer buffer = new Buffer();
-    private BinaryEncoder encoder = new BinaryEncoder(buffer);
+    private BinaryEncoder encoder = new EncoderFactory()
+        .configureBlockSize(512).binaryEncoder(buffer, null);
     
     private Collector(Schema schema) {
       this.writer = new SpecificDatumWriter<T>(schema);
@@ -81,6 +83,7 @@ public abstract class TetherTask<IN,MID,
     public void collect(T record) throws IOException {
       buffer.reset();
       writer.write(record, encoder);
+      encoder.flush();
       outputClient.output(buffer.data());
     }
     
@@ -88,6 +91,7 @@ public abstract class TetherTask<IN,MID,
     public void collect(T record, int partition) throws IOException {
       buffer.reset();
       writer.write(record, encoder);
+      encoder.flush();
       outputClient.outputPartitioned(partition, buffer.data());
     }
   }

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/BinaryFragmentToJsonTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/BinaryFragmentToJsonTool.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/BinaryFragmentToJsonTool.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/BinaryFragmentToJsonTool.java Fri Feb 25 00:36:40 2011
@@ -26,9 +26,9 @@ import org.apache.avro.Schema;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.JsonEncoder;
 import org.codehaus.jackson.JsonEncoding;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerator;
@@ -61,7 +61,7 @@ public class BinaryFragmentToJsonTool im
       JsonGenerator g =
         new JsonFactory().createJsonGenerator(out, JsonEncoding.UTF8);
       g.useDefaultPrettyPrinter();
-      writer.write(datum, new JsonEncoder(schema, g));
+      writer.write(datum, EncoderFactory.get().jsonEncoder(schema, g));
       g.flush();
       out.println();
       out.flush();

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileReadTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileReadTool.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileReadTool.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileReadTool.java Fri Feb 25 00:36:40 2011
@@ -28,9 +28,8 @@ import org.apache.avro.file.DataFileRead
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.io.JsonEncoder;
-import org.codehaus.jackson.JsonGenerator;
 
 /** Reads a data file and dumps to JSON */
 public class DataFileReadTool implements Tool {
@@ -61,10 +60,9 @@ public class DataFileReadTool implements
     try {
       Schema schema = fileReader.getSchema();
       DatumWriter<Object> writer = new GenericDatumWriter<Object>(schema);
-      Encoder encoder = new JsonEncoder(schema, (JsonGenerator)null);
+      JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, out);
       for (Object datum : fileReader) {
-        // init() recreates the internal Jackson JsonGenerator
-        encoder.init(out);
+        encoder.configure(out); //reinitializes state
         writer.write(datum, encoder);
         encoder.flush();
         out.println();

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/JsonToBinaryFragmentTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/JsonToBinaryFragmentTool.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/JsonToBinaryFragmentTool.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/JsonToBinaryFragmentTool.java Fri Feb 25 00:36:40 2011
@@ -25,7 +25,8 @@ import java.util.List;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.io.JsonDecoder;
 
 /** Tool to convert JSON data into the binary form. */
@@ -56,8 +57,9 @@ public class JsonToBinaryFragmentTool im
     
     GenericDatumWriter<Object> writer = 
         new GenericDatumWriter<Object>(schema);
-    writer.write(datum, new BinaryEncoder(out));
-    out.flush();
+    Encoder e = EncoderFactory.get().binaryEncoder(out, null);
+    writer.write(datum, e);
+    e.flush();
     } finally {
       if (needsClosing) {
         input.close();

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/RpcReceiveTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/RpcReceiveTool.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/RpcReceiveTool.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/RpcReceiveTool.java Fri Feb 25 00:36:40 2011
@@ -33,6 +33,7 @@ import org.apache.avro.AvroRemoteExcepti
 import org.apache.avro.Protocol;
 import org.apache.avro.Protocol.Message;
 import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.io.JsonEncoder;
 import org.apache.avro.ipc.Ipc;
 import org.apache.avro.ipc.Server;
@@ -83,7 +84,7 @@ public class RpcReceiveTool implements T
       try {
         JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(
             out, JsonEncoding.UTF8);
-        JsonEncoder jsonEncoder = new JsonEncoder(message.getRequest(), jsonGenerator);
+        JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(message.getRequest(), jsonGenerator);
 
         GenericDatumWriter<Object> writer = new GenericDatumWriter<Object>(
             message.getRequest());

Modified: avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/RpcSendTool.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/RpcSendTool.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/RpcSendTool.java (original)
+++ avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/RpcSendTool.java Fri Feb 25 00:36:40 2011
@@ -33,7 +33,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.Protocol.Message;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.ipc.Ipc;
 import org.apache.avro.ipc.generic.GenericRequestor;
 
@@ -109,7 +109,7 @@ public class RpcSendTool implements Tool
     JsonGenerator g =
       new JsonFactory().createJsonGenerator(out, JsonEncoding.UTF8);
     g.useDefaultPrettyPrinter();
-    writer.write(datum, new JsonEncoder(schema, g));
+    writer.write(datum, EncoderFactory.get().jsonEncoder(schema, g));
     g.flush();
     out.println();
     out.flush();



Mime
View raw message