avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From scottca...@apache.org
Subject svn commit: r1074364 [1/2] - in /avro/trunk: ./ lang/java/avro/src/main/java/org/apache/avro/ lang/java/avro/src/main/java/org/apache/avro/file/ lang/java/avro/src/main/java/org/apache/avro/io/ lang/java/avro/src/main/java/org/apache/avro/io/parsing/ l...
Date Fri, 25 Feb 2011 00:36:41 GMT
Author: scottcarey
Date: Fri Feb 25 00:36:40 2011
New Revision: 1074364

URL: http://svn.apache.org/viewvc?rev=1074364&view=rev
Log:
AVRO-753. Java: Improve BinaryEncoder Performance.

Added:
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/LegacyBinaryEncoder.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Schema.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryData.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BlockingBinaryEncoder.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/DecoderFactory.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingEncoder.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/GenerateBlockingData.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericData.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericDatumWriter.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/Perf.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingIO.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingIO2.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/TestValidatingIO.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/io/parsing/TestResolvingGrammarGenerator.java
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestCompare.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificData.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/specific/TestSpecificDatumWriter.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroSerialization.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestReflectJob.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TetherTask.java
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/BinaryFragmentToJsonTool.java
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/DataFileReadTool.java
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/JsonToBinaryFragmentTool.java
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/RpcReceiveTool.java
    avro/trunk/lang/java/tools/src/main/java/org/apache/avro/tool/RpcSendTool.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Feb 25 00:36:40 2011
@@ -53,6 +53,16 @@ Avro 1.5.0 (unreleased)
       org.apache.avro.ipc.specific
     (scottcarey)
 
+    AVRO-753. Java: Improve BinaryEncoder Performance.
+    The Encoder API has several resulting changes:
+    * Construction and configuration is handled by EncoderFactory.  All 
+      Constructors are hidden, and Encoder.init(OutputStream) is removed.
+    * Some Encoders previously did not buffer output.  Users must call
+      Encoder.flush() to ensure output is written unless the EncoderFactory
+      method used to construct an instance explicitly states that the Encoder
+      does not buffer output. 
+    (scottcarey)
+
     AVRO-670. Allow DataFileWriteTool to accept schema files as input with new
     --schema-file and --schema command-line flags. (Ron Bodkin via philz)
 

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Schema.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Schema.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Schema.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Schema.java Fri Feb 25 00:36:40 2011
@@ -419,6 +419,11 @@ public abstract class Schema {
         props.equals(that.props);
     }
     public int hashCode() { return name.hashCode() + schema.hashCode(); }
+    
+    @Override
+    public String toString() {
+      return name + " type:" + schema.type + " pos:" + position;
+    }
   }
 
   private static class Name {

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java Fri Feb 25 00:36:40 2011
@@ -178,10 +178,12 @@ public class DataFileStream<D> implement
   /** Returns an iterator over entries in this file.  Note that this iterator
    * is shared with other users of the file: it does not contain a separate
    * pointer into the file. */
+  @Override
   public Iterator<D> iterator() { return this; }
 
   private DataBlock block = null;
   /** True if more entries remain in this file. */
+  @Override
   public boolean hasNext() {
     try {
       if (blockRemaining == 0) {
@@ -212,6 +214,7 @@ public class DataFileStream<D> implement
   /** Read the next datum in the file.
    * @throws NoSuchElementException if no more remain in the file.
    */
+  @Override
   public D next() {
     try {
       return next(null);
@@ -293,9 +296,11 @@ public class DataFileStream<D> implement
   }
 
   /** Not supported. */
+  @Override
   public void remove() { throw new UnsupportedOperationException(); }
 
   /** Close this reader. */
+  @Override
   public void close() throws IOException {
     vin.inputStream().close();
   }
@@ -351,6 +356,7 @@ public class DataFileStream<D> implement
       e.writeLong(this.blockSize);
       e.writeFixed(this.data, offset, this.blockSize);
       e.writeFixed(sync);
+      e.flush();
     }   
     
   }

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java Fri Feb 25 00:36:40 2011
@@ -43,7 +43,7 @@ import org.apache.avro.file.DataFileStre
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
 
 /** Stores in a file a sequence of data conforming to a schema.  The schema is
  * stored in the file with the data.  Each datum in a file is of the same
@@ -65,7 +65,7 @@ public class DataFileWriter<D> implement
   private long blockCount;                       // # entries in current block
 
   private NonCopyingByteArrayOutputStream buffer;
-  private Encoder bufOut;
+  private BinaryEncoder bufOut;
 
   private byte[] sync;                          // 16 random bytes
   private int syncInterval = DataFileConstants.DEFAULT_SYNC_INTERVAL;
@@ -132,7 +132,7 @@ public class DataFileWriter<D> implement
 
     init(outs);
 
-    out.write(DataFileConstants.MAGIC);           // write magic
+    vout.writeFixed(DataFileConstants.MAGIC);           // write magic
 
     vout.writeMapStart();                         // write metadata
     vout.setItemCount(meta.size());
@@ -142,10 +142,8 @@ public class DataFileWriter<D> implement
       vout.writeBytes(entry.getValue());
     }
     vout.writeMapEnd();
+    vout.writeFixed(sync);                       // write initial sync
     vout.flush(); //vout may be buffered, flush before writing to out
-
-    out.write(sync);                              // write initial sync
-
     return this;
   }
 
@@ -178,11 +176,12 @@ public class DataFileWriter<D> implement
 
   private void init(OutputStream outs) throws IOException {
     this.out = new BufferedFileOutputStream(outs);
-    this.vout = new BinaryEncoder(out);
+    EncoderFactory efactory = new EncoderFactory();
+    this.vout = efactory.binaryEncoder(out, null);
     dout.setSchema(schema);
     buffer = new NonCopyingByteArrayOutputStream(
         Math.min((int)(syncInterval * 1.25), Integer.MAX_VALUE/2 -1));
-    this.bufOut = new BinaryEncoder(buffer);
+    this.bufOut = efactory.binaryEncoder(buffer, null);
     if (this.codec == null) {
       this.codec = CodecFactory.nullCodec().createInstance();
     }
@@ -244,8 +243,7 @@ public class DataFileWriter<D> implement
     assertOpen();
     dout.write(datum, bufOut);
     blockCount++;
-    if (buffer.size() >= syncInterval)
-      writeBlock();
+    writeIfBlockFull();
   }
 
   /** Expert: Append a pre-encoded datum to the file.  No validation is
@@ -254,9 +252,13 @@ public class DataFileWriter<D> implement
   public void appendEncoded(ByteBuffer datum) throws IOException {
     assertOpen();
     int start = datum.position();
-    buffer.write(datum.array(), start, datum.limit()-start);
+    bufOut.writeFixed(datum.array(), start, datum.limit()-start);
     blockCount++;
-    if (buffer.size() >= syncInterval)
+    writeIfBlockFull();
+  }
+
+  private void writeIfBlockFull() throws IOException {
+    if ((buffer.size() + bufOut.bytesBuffered()) >= syncInterval)
       writeBlock();
   }
 
@@ -307,6 +309,7 @@ public class DataFileWriter<D> implement
   
   private void writeBlock() throws IOException {
     if (blockCount > 0) {
+      bufOut.flush();
       ByteBuffer uncompressed = buffer.getByteArrayAsByteBuffer();
       DataBlock block = new DataBlock(uncompressed, blockCount);
       block.compressUsing(codec);
@@ -326,13 +329,14 @@ public class DataFileWriter<D> implement
   }
 
   /** Flush the current state of the file. */
+  @Override
   public void flush() throws IOException {
     sync();
     vout.flush();
-    out.flush();
   }
 
   /** Close the file. */
+  @Override
   public void close() throws IOException {
     flush();
     out.close();
@@ -344,6 +348,7 @@ public class DataFileWriter<D> implement
 
     private class PositionFilter extends FilterOutputStream {
       public PositionFilter(OutputStream out) throws IOException { super(out); }
+      @Override
       public void write(byte[] b, int off, int len) throws IOException {
         out.write(b, off, len);
         position += len;                           // update on write

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryData.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryData.java Fri Feb 25 00:36:40 2011
@@ -300,4 +300,129 @@ public class BinaryData {
     return i;
   }
 
+  /** Encode a boolean to the byte array at the given position. Will throw
+   * IndexOutOfBounds if the position is not valid.
+   * @return The number of bytes written to the buffer, 1.
+   */
+  public static int encodeBoolean(boolean b, byte[] buf, int pos) {
+    buf[pos] = b ? (byte) 1 : (byte) 0;
+    return 1;
+  }
+
+  /** Encode an integer to the byte array at the given position. Will throw
+   * IndexOutOfBounds if it overflows. Users should ensure that there are at
+   * least 5 bytes left in the buffer before calling this method.
+   * @return The number of bytes written to the buffer, between 1 and 5.
+   */
+  public static int encodeInt(int n, byte[] buf, int pos) {
+  // move sign to low-order bit, and flip others if negative
+    n = (n << 1) ^ (n >> 31);
+    int start = pos;
+    if ((n & ~0x7F) != 0) {
+      buf[pos++] = (byte)((n | 0x80) & 0xFF);
+      n >>>= 7;
+      if (n > 0x7F) {
+        buf[pos++] = (byte)((n | 0x80) & 0xFF);
+        n >>>= 7;
+        if (n > 0x7F) {
+          buf[pos++] = (byte)((n | 0x80) & 0xFF);
+          n >>>= 7;
+          if (n > 0x7F) {
+            buf[pos++] = (byte)((n | 0x80) & 0xFF);
+            n >>>= 7;
+          }
+        }
+      }
+    } 
+    buf[pos++] = (byte) n;
+    return pos - start;
+  }
+
+  /** Encode a long to the byte array at the given position. Will throw
+   * IndexOutOfBounds if it overflows. Users should ensure that there are at
+   * least 10 bytes left in the buffer before calling this method.
+   * @return The number of bytes written to the buffer, between 1 and 10.
+   */
+  public static int encodeLong(long n, byte[] buf, int pos) {
+    // move sign to low-order bit, and flip others if negative
+    n = (n << 1) ^ (n >> 63);
+    int start = pos;
+    if ((n & ~0x7FL) != 0) {
+      buf[pos++] = (byte)((n | 0x80) & 0xFF);
+      n >>>= 7;
+      if (n > 0x7F) {
+        buf[pos++] = (byte)((n | 0x80) & 0xFF);
+        n >>>= 7;
+        if (n > 0x7F) {
+          buf[pos++] = (byte)((n | 0x80) & 0xFF);
+          n >>>= 7;
+          if (n > 0x7F) {
+            buf[pos++] = (byte)((n | 0x80) & 0xFF);
+            n >>>= 7;
+            if (n > 0x7F) {
+              buf[pos++] = (byte)((n | 0x80) & 0xFF);
+              n >>>= 7;
+              if (n > 0x7F) {
+                buf[pos++] = (byte)((n | 0x80) & 0xFF);
+                n >>>= 7;
+                if (n > 0x7F) {
+                  buf[pos++] = (byte)((n | 0x80) & 0xFF);
+                  n >>>= 7;
+                  if (n > 0x7F) {
+                    buf[pos++] = (byte)((n | 0x80) & 0xFF);
+                    n >>>= 7;
+                    if (n > 0x7F) {
+                      buf[pos++] = (byte)((n | 0x80) & 0xFF);
+                      n >>>= 7;
+                    }
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+    buf[pos++] = (byte) n;
+    return pos - start;
+  }
+
+  /** Encode a float to the byte array at the given position. Will throw
+   * IndexOutOfBounds if it overflows. Users should ensure that there are at
+   * least 4 bytes left in the buffer before calling this method.
+   * @return Returns the number of bytes written to the buffer, 4.
+   */
+  public static int encodeFloat(float f, byte[] buf, int pos) {
+    int len = 1;
+    int bits = Float.floatToRawIntBits(f);
+    // hotspot compiler works well with this variant 
+    buf[pos]         = (byte)((bits       ) & 0xFF);
+    buf[pos + len++] = (byte)((bits >>>  8) & 0xFF);
+    buf[pos + len++] = (byte)((bits >>> 16) & 0xFF);
+    buf[pos + len++] = (byte)((bits >>> 24) & 0xFF);
+    return 4;
+  }
+
+  /** Encode a double to the byte array at the given position. Will throw
+   * IndexOutOfBounds if it overflows. Users should ensure that there are at
+   * least 8 bytes left in the buffer before calling this method.
+   * @return Returns the number of bytes written to the buffer, 8.
+   */
+  public static int encodeDouble(double d, byte[] buf, int pos) {
+    long bits = Double.doubleToRawLongBits(d);
+    int first = (int)(bits & 0xFFFFFFFF);
+    int second = (int)((bits >>> 32) & 0xFFFFFFFF);
+    // the compiler seems to execute this order the best, likely due to
+    // register allocation -- the lifetime of constants is minimized.
+    buf[pos]     = (byte)((first        ) & 0xFF);
+    buf[pos + 4] = (byte)((second       ) & 0xFF);
+    buf[pos + 5] = (byte)((second >>>  8) & 0xFF);
+    buf[pos + 1] = (byte)((first >>>   8) & 0xFF);
+    buf[pos + 2] = (byte)((first >>>  16) & 0xFF);
+    buf[pos + 6] = (byte)((second >>> 16) & 0xFF);
+    buf[pos + 7] = (byte)((second >>> 24) & 0xFF);
+    buf[pos + 3] = (byte)((first >>>  24) & 0xFF);
+    return 8;
+  }
+
 }

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java Fri Feb 25 00:36:40 2011
@@ -18,246 +18,107 @@
 package org.apache.avro.io;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
-import org.apache.avro.util.ByteBufferOutputStream;
 import org.apache.avro.util.Utf8;
 
 /**
- * Low-level support for serializing Avro values.
- *
- * This class has two types of methods.  One type of methods support
- * the writing of leaf values (for example, {@link #writeLong} and
- * {@link #writeString}).  These methods have analogs in {@link
- * Decoder}.
- *
- * The other type of methods support the writing of maps and arrays.
- * These methods are {@link #writeArrayStart}, {@link
- * #startItem}, and {@link #writeArrayEnd} (and similar methods for
- * maps).  Some implementations of {@link Encoder} handle the
- * buffering required to break large maps and arrays into blocks,
- * which is necessary for applications that want to do streaming.
- * (See {@link #writeArrayStart} for details on these methods.)
- *
- *  @see Decoder
+ * An abstract {@link Encoder} for Avro's binary encoding.
+ * <p/>
+ * To construct and configure instances, use {@link EncoderFactory}
+ * 
+ * @see EncoderFactory
+ * @see BufferedBinaryEncoder
+ * @see DirectBinaryEncoder
+ * @see BlockingBinaryEncoder
+ * @see Encoder
+ * @see Decoder
  */
-public class BinaryEncoder extends Encoder {
-  protected OutputStream out;
+public abstract class BinaryEncoder extends Encoder {
   
-  private interface ByteWriter {
-    void write(ByteBuffer bytes) throws IOException;
-  }
-  
-  private static final class SimpleByteWriter implements ByteWriter {
-    private final OutputStream out;
-
-    public SimpleByteWriter(OutputStream out) {
-      this.out = out;
-    }
-
-    @Override
-    public void write(ByteBuffer bytes) throws IOException {
-      encodeLong(bytes.remaining(), out);
-      out.write(bytes.array(), bytes.position(), bytes.remaining());
-    }
-  }
-
-  private static final class ReuseByteWriter implements ByteWriter {
-    private final ByteBufferOutputStream bbout;
-    public ReuseByteWriter(ByteBufferOutputStream bbout) {
-      this.bbout = bbout;
-    }
-
-    @Override
-    public void write(ByteBuffer bytes) throws IOException {
-      encodeLong(bytes.remaining(), bbout);
-      bbout.writeBuffer(bytes);
-    }
-  }
-  
-  private final ByteWriter byteWriter;
-
-  /** Create a writer that sends its output to the underlying stream
-   *  <code>out</code>. */
-  public BinaryEncoder(OutputStream out) {
-    this.out = out;
-    this.byteWriter = (out instanceof ByteBufferOutputStream) ?
-        new ReuseByteWriter((ByteBufferOutputStream) out) :
-          new SimpleByteWriter(out);
-  }
-
-  @Override
-  public void init(OutputStream out) throws IOException {
-    flush();
-    this.out = out;
-  }
-
-  @Override
-  public void flush() throws IOException {
-    if (out != null) {
-      out.flush();
-    }
-  }
-
-  @Override
-  public void writeNull() throws IOException { }
-  
-  @Override
-  public void writeBoolean(boolean b) throws IOException {
-    out.write(b ? 1 : 0);
-  }
-
-  @Override
-  public void writeInt(int n) throws IOException {
-    encodeLong(n, out);
-  }
-
   @Override
-  public void writeLong(long n) throws IOException {
-    encodeLong(n, out);
-  }
+  public void writeNull() throws IOException {}
   
   @Override
-  public void writeFloat(float f) throws IOException {
-    encodeFloat(f, out);
-  }
-
-  @Override
-  public void writeDouble(double d) throws IOException {
-    encodeDouble(d, out);
-  }
-
-  @Override
   public void writeString(Utf8 utf8) throws IOException {
-    encodeString(utf8.getBytes(), 0, utf8.getByteLength());
+    this.writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
   }
   
   @Override
   public void writeString(String string) throws IOException {
-    byte[] bytes = Utf8.getBytesFor(string);
-    encodeString(bytes, 0, bytes.length);
-  }
-  
-  private void encodeString(byte[] bytes, int offset, int length) throws IOException {
-    encodeLong(length, out);
-    out.write(bytes, offset, length);
+    if (0 == string.length()) {
+      writeZero();
+      return;
+    }
+    byte[] bytes = string.getBytes("UTF-8");
+    writeInt(bytes.length);
+    writeFixed(bytes, 0, bytes.length);
   }
-  
+
   @Override
   public void writeBytes(ByteBuffer bytes) throws IOException {
-    byteWriter.write(bytes);
+    int pos = bytes.position();
+    int start = bytes.arrayOffset() + pos;
+    int len = bytes.limit() - pos;
+    writeBytes(bytes.array(), start, len);
   }
   
   @Override
   public void writeBytes(byte[] bytes, int start, int len) throws IOException {
-    encodeLong(len, out);
-    out.write(bytes, start, len);
+    if (0 == len) {
+      writeZero();
+      return;
+    }
+    this.writeInt(len);
+    this.writeFixed(bytes, start, len);
   }
   
   @Override
-  public void writeFixed(byte[] bytes, int start, int len) throws IOException {
-    out.write(bytes, start, len);
-  }
-
-  @Override
   public void writeEnum(int e) throws IOException {
-    encodeLong(e, out);
+    this.writeInt(e);
   }
 
   @Override
-  public void writeArrayStart() throws IOException {
-  }
+  public void writeArrayStart() throws IOException {}
 
   @Override
   public void setItemCount(long itemCount) throws IOException {
     if (itemCount > 0) {
-      writeLong(itemCount);
+      this.writeLong(itemCount);
     }
   }
   
   @Override
-  public void startItem() throws IOException {
-  }
+  public void startItem() throws IOException {}
 
   @Override
   public void writeArrayEnd() throws IOException {
-    encodeLong(0, out);
+    writeZero();
   }
 
   @Override
-  public void writeMapStart() throws IOException {
-  }
+  public void writeMapStart() throws IOException {}
 
   @Override
   public void writeMapEnd() throws IOException {
-    encodeLong(0, out);
+    writeZero();
   }
 
   @Override
   public void writeIndex(int unionIndex) throws IOException {
-    encodeLong(unionIndex, out);
+    writeInt(unionIndex);
   }
   
-  protected static void encodeLong(long n, OutputStream o) throws IOException {
-    n = (n << 1) ^ (n >> 63); // move sign to low-order bit
-    while ((n & ~0x7F) != 0) {
-      o.write((byte)((n & 0x7f) | 0x80));
-      n >>>= 7;
-    }
-    o.write((byte)n);
-  }
-
-  protected static int encodeLong(long n, byte[] b, int pos) {
-    n = (n << 1) ^ (n >> 63); // move sign to low-order bit
-    while ((n & ~0x7F) != 0) {
-      b[pos++] = (byte)((n & 0x7f) | 0x80);
-      n >>>= 7;
-    }
-    b[pos++] = (byte) n;
-    return pos;
-  }
-
-  protected static void encodeFloat(float f, OutputStream o) throws IOException {
-    long bits = Float.floatToRawIntBits(f);
-    o.write((int)(bits      ) & 0xFF);
-    o.write((int)(bits >>  8) & 0xFF);
-    o.write((int)(bits >> 16) & 0xFF);
-    o.write((int)(bits >> 24) & 0xFF);
-  }
-
-  protected static int encodeFloat(float f, byte[] b, int pos) {
-    long bits = Float.floatToRawIntBits(f);
-    b[pos++] = (byte)((bits      ) & 0xFF);
-    b[pos++] = (byte)((bits >>  8) & 0xFF);
-    b[pos++] = (byte)((bits >> 16) & 0xFF);
-    b[pos++] = (byte)((bits >> 24) & 0xFF);
-    return pos;
-  }
-
-  protected static void encodeDouble(double d, OutputStream o) throws IOException {
-    long bits = Double.doubleToRawLongBits(d);
-    o.write((int)(bits      ) & 0xFF);
-    o.write((int)(bits >>  8) & 0xFF);
-    o.write((int)(bits >> 16) & 0xFF);
-    o.write((int)(bits >> 24) & 0xFF);
-    o.write((int)(bits >> 32) & 0xFF);
-    o.write((int)(bits >> 40) & 0xFF);
-    o.write((int)(bits >> 48) & 0xFF);
-    o.write((int)(bits >> 56) & 0xFF);
-  }
-
-  protected static int encodeDouble(double d, byte[] b, int pos) {
-    long bits = Double.doubleToRawLongBits(d);
-    b[pos++] = (byte)((bits      ) & 0xFF);
-    b[pos++] = (byte)((bits >>  8) & 0xFF);
-    b[pos++] = (byte)((bits >> 16) & 0xFF);
-    b[pos++] = (byte)((bits >> 24) & 0xFF);
-    b[pos++] = (byte)((bits >> 32) & 0xFF);
-    b[pos++] = (byte)((bits >> 40) & 0xFF);
-    b[pos++] = (byte)((bits >> 48) & 0xFF);
-    b[pos++] = (byte)((bits >> 56) & 0xFF);
-    return pos;
-  }
+  /** Write a zero byte to the underlying output. **/
+  protected abstract void writeZero() throws IOException;
+  
+  /**
+   * Returns the number of bytes currently buffered by this encoder. If this
+   * Encoder does not buffer, this will always return zero.
+   * <p/>
+   * Call {@link #flush()} to empty the buffer to the underlying output.
+   */
+  public abstract int bytesBuffered();
+  
 }
 

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BlockingBinaryEncoder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BlockingBinaryEncoder.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BlockingBinaryEncoder.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BlockingBinaryEncoder.java Fri Feb 25 00:36:40 2011
@@ -17,24 +17,32 @@
  */
 package org.apache.avro.io;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 
 import org.apache.avro.AvroTypeException;
 import org.apache.avro.Schema;
-import org.apache.avro.util.Utf8;
 
-/** A {@link Encoder} that writes large arrays and maps as a sequence of
- * blocks.  So long as individual primitive values fit in memory, arbitrarily
- * long arrays and maps may be written and subsequently read without exhausting
- * memory.  Values are buffered until the specified block size would be
- * exceeded, minimizing block overhead.
+/** A {@link BinaryEncoder} implementation that writes large arrays and maps as a
+ * sequence of blocks. So long as individual primitive values fit in memory,
+ * arbitrarily long arrays and maps may be written and subsequently read without
+ * exhausting memory. Values are buffered until the specified block size would
+ * be exceeded, minimizing block overhead.
+ * <p/>
+ * Use {@link EncoderFactory#blockingBinaryEncoder(OutputStream, BinaryEncoder)
+ * to construct and configure.
+ * <p/>
+ * BlockingBinaryEncoder buffers writes, data may not appear on the output until
+ * {@link #flush()} is called.
+ * <p/>
+ * BlockingBinaryEncoder is not thread-safe
+ * 
+ * @see BinaryEncoder
+ * @see EncoderFactory
  * @see Encoder
  */
-public class BlockingBinaryEncoder extends BinaryEncoder {
+public class BlockingBinaryEncoder extends BufferedBinaryEncoder {
 
  /* Implementation note:
   *
@@ -172,22 +180,12 @@ public class BlockingBinaryEncoder exten
   private int stackTop = -1;
   private static final int STACK_STEP = 10;
 
-  private static final class EncoderBuffer extends ByteArrayOutputStream {
-    public byte[] buffer() {
-      return buf;
-    }
-    
-    public int length() {
-      return count;
-    }
-  }
-  
-  private EncoderBuffer encoderBuffer = new EncoderBuffer();
+  //buffer large enough for up to two ints for a block header
+  //rounded up to a multiple of 4 bytes.
+  private byte[] headerBuffer = new byte[12];
 
   private boolean check() {
-    assert out != null;
     assert buf != null;
-    assert MIN_BUFFER_SIZE <= buf.length;
     assert 0 <= pos;
     assert pos <= buf.length : pos + " " + buf.length;
 
@@ -201,19 +199,10 @@ public class BlockingBinaryEncoder exten
     return true;
   }
 
-  private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
-  private static final int MIN_BUFFER_SIZE = 64;
-
-  public BlockingBinaryEncoder(OutputStream out) {
-    this(out, DEFAULT_BUFFER_SIZE);
-  }
-
-  public BlockingBinaryEncoder(OutputStream out, int bufferSize) {
-    super(out);
-    if (bufferSize < MIN_BUFFER_SIZE) {
-      throw new IllegalArgumentException("Buffer size too smll.");
-    }
-    this.buf = new byte[bufferSize];
+  BlockingBinaryEncoder(OutputStream out,
+      int blockBufferSize, int binaryEncoderBufferSize) {
+    super(out, binaryEncoderBufferSize);
+    this.buf = new byte[blockBufferSize];
     this.pos = 0;
     blockStack = new BlockedValue[0];
     expandStack();
@@ -235,123 +224,74 @@ public class BlockingBinaryEncoder exten
     }
   }
 
-  /** Redirect output (and reset the parser state if we're checking). */
-  @Override
-  public void init(OutputStream out) throws IOException {
-    super.init(out);
-    this.pos = 0;
-    this.stackTop = 0;
-
+  BlockingBinaryEncoder configure(OutputStream out, int blockBufferSize,
+      int binaryEncoderBufferSize) {
+    super.configure(out, binaryEncoderBufferSize);
+    pos = 0;
+    stackTop = 0;
+    if (null == buf || buf.length != blockBufferSize) {
+      buf = new byte[blockBufferSize];
+    }
+    
     assert check();
+    return this;
   }
-
+  
   @Override
   public void flush() throws IOException {
-    if (out != null) {
       BlockedValue bv = blockStack[stackTop];
       if (bv.state == BlockedValue.State.ROOT) {
-        out.write(buf, 0, pos);
+        super.writeFixed(buf, 0, pos);
         pos = 0;
       } else {
         while (bv.state != BlockedValue.State.OVERFLOW) {
           compact();
         }
       }
-      out.flush();
-    }
+      super.flush();
 
     assert check();
   }
 
   @Override
   public void writeBoolean(boolean b) throws IOException {
-    if (buf.length < (pos + 1)) ensure(1);
-    buf[pos++] = (byte)(b ? 1 : 0);
-
-    assert check();
+    ensureBounds(1);
+    pos += BinaryData.encodeBoolean(b, buf, pos);
   }
 
   @Override
   public void writeInt(int n) throws IOException {
-    if (pos + 5 > buf.length) {
-      ensure(5);
-    }
-    pos = encodeLong(n, buf, pos);
-
-    assert check();
+    ensureBounds(5);
+    pos += BinaryData.encodeInt(n, buf, pos);
   }
 
   @Override
   public void writeLong(long n) throws IOException {
-    if (pos + 10 > buf.length) {
-      ensure(10);
-    }
-    pos = encodeLong(n, buf, pos);
-
-    assert check();
+    ensureBounds(10);
+    pos += BinaryData.encodeLong(n, buf, pos);
   }
     
   @Override
   public void writeFloat(float f) throws IOException {
-    if (pos + 4 > buf.length) {
-      ensure(4);
-    }
-    pos = encodeFloat(f, buf, pos);
-
-    assert check();
+    ensureBounds(4);
+    pos += BinaryData.encodeFloat(f, buf, pos);
   }
 
   @Override
   public void writeDouble(double d) throws IOException {
-    if (pos + 8 > buf.length) {
-      ensure(8);
-    }
-    pos = encodeDouble(d, buf, pos);
-
-    assert check();
+    ensureBounds(8);
+    pos += BinaryData.encodeDouble(d, buf, pos);
   }
 
   @Override
-  public void writeString(Utf8 utf8) throws IOException {
-    writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
-    // assert called in writeBytes
-  }
-  
-  @Override
-  public void writeString(String str) throws IOException {
-    byte[] utf8bytes = Utf8.getBytesFor(str);
-    writeBytes(utf8bytes, 0, utf8bytes.length);
-    // assert called in writeBytes
-  }
-
-  @Override
-  public void writeBytes(ByteBuffer bytes) throws IOException {
-    writeBytes(bytes.array(), bytes.position(), bytes.remaining());
-
-    assert check();
-  }
-  
-  @Override
   public void writeFixed(byte[] bytes, int start, int len) throws IOException {
     doWriteBytes(bytes, start, len);
-
-    assert check();
   }
   
   @Override
-  public void writeEnum(int e) throws IOException {
-    writeInt(e);
-  }
-
-  @Override
-  public void writeBytes(byte[] bytes, int start, int len) throws IOException {
-    if (pos + 5 > buf.length) {
-      ensure(5);
-    }
-    pos = encodeLong(len, buf, pos);
-    doWriteBytes(bytes, start, len);
-
-    assert check();
+  protected void writeZero() throws IOException {
+    ensureBounds(1);
+    buf[pos++] = (byte) 0;
   }
 
   @Override
@@ -437,14 +377,15 @@ public class BlockingBinaryEncoder exten
 
   @Override
   public void writeIndex(int unionIndex) throws IOException {
-    if (pos + 5 > buf.length) {
-      ensure(5);
-    }
-    pos = encodeLong(unionIndex, buf, pos);
-
-    assert check();
+    ensureBounds(5);
+    pos += BinaryData.encodeInt(unionIndex, buf, pos);
   }
 
+  @Override
+  public int bytesBuffered() {
+    return pos + super.bytesBuffered();
+  }
+  
   private void endBlockedValue() throws IOException {
     for (; ;) {
       assert check();
@@ -459,27 +400,25 @@ public class BlockingBinaryEncoder exten
         if (t.start == 0 &&
           blockStack[stackTop - 1].state
             != BlockedValue.State.REGULAR) { // Lucky us -- don't have to move
-          encodeLong(-t.items, out);
-          encodeLong(byteCount, out);
+          super.writeInt(-t.items);
+          super.writeInt(byteCount);
         } else {
-          encodeLong(-t.items, encoderBuffer);
-          encodeLong(byteCount, encoderBuffer);
-          final int headerSize = encoderBuffer.length();
+          int headerSize = 0;
+          headerSize += BinaryData.encodeInt(-t.items, headerBuffer, headerSize);
+          headerSize += BinaryData.encodeInt(byteCount, headerBuffer, headerSize);
           if (buf.length >= pos + headerSize) {
             pos += headerSize;
             final int m = t.start;
             System.arraycopy(buf, m, buf, m + headerSize, byteCount);
-            System.arraycopy(encoderBuffer.buffer(), 0, buf, m, headerSize);
-            encoderBuffer.reset();
+            System.arraycopy(headerBuffer, 0, buf, m, headerSize);
           } else {
-            encoderBuffer.reset();
             compact();
             continue;
           }
         }
       }
       stackTop--;
-      if (buf.length < (pos + 1)) ensure(1);
+      ensureBounds(1);
       buf[pos++] = 0;   // Sentinel for last block in a blocked value
       assert check();
       if (blockStack[stackTop].state == BlockedValue.State.ROOT) {
@@ -503,7 +442,7 @@ public class BlockingBinaryEncoder exten
     assert check();
 
     // Flush any remaining data for this block
-    out.write(buf, 0, pos);
+    super.writeFixed(buf, 0, pos);
     pos = 0;
 
     // Reset top of stack to be in REGULAR mode
@@ -513,15 +452,12 @@ public class BlockingBinaryEncoder exten
     assert check();
   }
 
-  private void ensure(int l) throws IOException {
-    if (buf.length < l) {
-      throw new IllegalArgumentException("Too big: " + l);
-    }
+  private void ensureBounds(int l) throws IOException {
     while (buf.length < (pos + l)) {
       if (blockStack[stackTop].state == BlockedValue.State.REGULAR) {
         compact();
       } else {
-        out.write(buf, 0, pos);
+        super.writeFixed(buf, 0, pos);
         pos = 0;
       }
     }
@@ -530,39 +466,38 @@ public class BlockingBinaryEncoder exten
   private void doWriteBytes(byte[] bytes, int start, int len)
     throws IOException {
     if (len < buf.length) {
-      ensure(len);
+      ensureBounds(len);
       System.arraycopy(bytes, start, buf, pos, len);
       pos += len;
     } else {
-      ensure(buf.length);
+      ensureBounds(buf.length);
       assert blockStack[stackTop].state == BlockedValue.State.ROOT ||
         blockStack[stackTop].state == BlockedValue.State.OVERFLOW;
       write(bytes, start, len);
     }
-    assert check();
   }
 
   private void write(byte[] b, int off, int len) throws IOException {
     if (blockStack[stackTop].state == BlockedValue.State.ROOT) {
-      out.write(b, off, len);
+      super.writeFixed(b, off, len);
     } else {
       assert check();
       while (buf.length < (pos + len)) {
         if (blockStack[stackTop].state == BlockedValue.State.REGULAR) {
           compact();
         } else {
-          out.write(buf, 0, pos);
+          super.writeFixed(buf, 0, pos);
           pos = 0;
           if (buf.length <= len) {
-            out.write(b, off, len);
+            super.writeFixed(b, off, len);
             len = 0;
           }
         }
       }
       System.arraycopy(b, off, buf, pos, len);
       pos += len;
-      assert check();
     }
+    assert check();
   }
 
   /** Only call if you're there are REGULAR-state values on the stack. */
@@ -587,26 +522,26 @@ public class BlockingBinaryEncoder exten
     // blocked values).
 
     // Flush any bytes prios to "s"
-    out.write(buf, 0, s.start);
+    super.writeFixed(buf, 0, s.start);
 
     // Write any full items of "s"
     if (1 < s.items) {
-      encodeLong(-(s.items - 1), out);
-      encodeLong(s.lastFullItem - s.start, out);
-      out.write(buf, s.start, s.lastFullItem - s.start);
+      super.writeInt(-(s.items - 1));
+      super.writeInt(s.lastFullItem - s.start);
+      super.writeFixed(buf, s.start, s.lastFullItem - s.start);
       s.start = s.lastFullItem;
       s.items = 1;
     }
 
     // Start an overflow block for s
-    encodeLong(1, out);
+    super.writeInt(1);
 
     // Write any remaining bytes for "s", up to the next-most
     // deeply-nested value
     BlockedValue n = ((i + 1) <= stackTop ?
         blockStack[i + 1] : null);
     int end = (n == null ? pos : n.start);
-    out.write(buf, s.lastFullItem, end - s.lastFullItem);
+    super.writeFixed(buf, s.lastFullItem, end - s.lastFullItem);
 
     // Move over any bytes that remain (and adjust indices)
     System.arraycopy(buf, end, buf, 0, pos - end);
@@ -623,5 +558,5 @@ public class BlockingBinaryEncoder exten
 
     assert check();
   }
-}
 
+}

Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java?rev=1074364&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java (added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java Fri Feb 25 00:36:40 2011
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.avro.AvroRuntimeException;
+
+/**
+ * An {@link Encoder} for Avro's binary encoding.
+ * <p/>
+ * This implementation buffers output to enhance performance.
+ * Output may not appear on the underlying output until flush() is called.
+ * <p/>
+ * {@link DirectBinaryEncoder} can be used in place of this implementation if
+ * the buffering semantics are not desired, and the performance difference
+ * is acceptable.
+ * <p/>
+ * To construct or reconfigure, use
+ * {@link EncoderFactory#bufferedBinaryEncoder(OutputStream, BinaryEncoder)}.
+ * <p/>
+ * To change the buffer size, configure the factory instance used to 
+ * create instances with {@link EncoderFactory#configureBufferSize(int)}
+ *  @see Encoder
+ *  @see EncoderFactory
+ *  @see BlockingBinaryEncoder
+ *  @see DirectBinaryEncoder
+ */
+public class BufferedBinaryEncoder extends BinaryEncoder {
+  private byte[] buf;
+  private int pos;
+  private ByteSink sink;
+  private int bulkLimit;
+
+  BufferedBinaryEncoder(OutputStream out, int bufferSize) {
+    configure(out, bufferSize);
+  }
+  
+  BufferedBinaryEncoder configure(OutputStream out, int bufferSize) {
+    if (null == out)
+      throw new NullPointerException("OutputStream cannot be null!");
+    if (null != this.sink) {
+      if ( pos > 0) {
+        try {
+          flushBuffer();
+        } catch (IOException e) {
+          throw new AvroRuntimeException("Failure flushing old output", e);
+        }
+      }
+    }
+    this.sink = new OutputStreamSink(out);
+    pos = 0;
+    if (null == buf || buf.length != bufferSize) {
+      buf = new byte[bufferSize];
+    }
+    bulkLimit = buf.length >>> 1;
+    if (bulkLimit > 512) {
+      bulkLimit = 512;
+    }
+    return this;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    flushBuffer();
+    sink.innerFlush();
+  }
+
+  /** Flushes the internal buffer to the underlying output. 
+   * Does not flush the underlying output.
+   */
+  private void flushBuffer() throws IOException {
+    if (pos > 0) {
+      sink.innerWrite(buf, 0, pos);
+      pos = 0;
+    }
+  }
+
+  /** Ensures that the buffer has at least num bytes free to write to between its
+   * current position and the end. This will not expand the buffer larger than
+   * its current size, for writes larger than or near to the size of the buffer,
+   * we flush the buffer and write directly to the output, bypassing the buffer.
+   * @param num
+   * @throws IOException
+   */
+  private void ensureBounds(int num) throws IOException {
+    int remaining = buf.length - pos;
+    if (remaining < num) {
+      flushBuffer();
+    }
+  }
+
+  @Override
+  public void writeBoolean(boolean b) throws IOException {
+    // inlined, shorter version of ensureBounds
+    if (buf.length == pos) {
+      flushBuffer();
+    }
+    pos += BinaryData.encodeBoolean(b, buf, pos);
+  }
+
+  @Override
+  public void writeInt(int n) throws IOException {
+    ensureBounds(5);
+    pos += BinaryData.encodeInt(n, buf, pos);
+  }
+
+  @Override
+  public void writeLong(long n) throws IOException {
+    ensureBounds(10);
+    pos += BinaryData.encodeLong(n, buf, pos);
+  }
+
+  @Override
+  public void writeFloat(float f) throws IOException {
+    ensureBounds(4);
+    pos += BinaryData.encodeFloat(f, buf, pos);
+  }
+
+  @Override
+  public void writeDouble(double d) throws IOException {
+    ensureBounds(8);
+    pos += BinaryData.encodeDouble(d, buf, pos);
+  }
+
+  @Override
+  public void writeFixed(byte[] bytes, int start, int len) throws IOException {
+    if (len > bulkLimit) {
+      //too big, write direct
+      flushBuffer();
+      sink.innerWrite(bytes, start, len);
+      return;
+    }
+    ensureBounds(len);
+    System.arraycopy(bytes, start, buf, pos, len);
+    pos += len;
+  }
+
+  @Override
+  protected void writeZero() throws IOException {
+    writeByte(0);
+  }
+  
+  private void writeByte(int b) throws IOException {
+    if (pos == buf.length) {
+      flushBuffer();
+    }
+    buf[pos++] = (byte) (b & 0xFF);
+  }
+
+  @Override
+  public int bytesBuffered() {
+    return pos;
+  }
+
+  /**
+   * ByteSink abstracts the destination of written data from the core workings
+   * of BinaryEncoder.
+   * <p/>
+   * Currently the only destination option is an OutputStream, but we may later
+   * want to handle other constructs or specialize for certain OutputStream
+   * Implementations such as ByteBufferOutputStream.
+   * <p/>
+   */
+  private abstract static class ByteSink {
+    protected ByteSink() {}
+    /** Write data from bytes, starting at off, for len bytes **/
+    protected abstract void innerWrite(byte[] bytes, int off, int len) throws IOException;
+    /** Flush the underlying output, if supported **/
+    protected abstract void innerFlush() throws IOException;
+  }
+  
+  static class OutputStreamSink extends ByteSink {
+    private final OutputStream out;
+    private OutputStreamSink(OutputStream out) {
+      super();
+      this.out = out;
+    }
+    @Override
+    protected void innerWrite(byte[] bytes, int off, int len)
+        throws IOException {
+      out.write(bytes, off, len);
+    }
+    @Override
+    protected void innerFlush() throws IOException {
+      out.flush();
+    }
+  }
+}

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java Fri Feb 25 00:36:40 2011
@@ -190,7 +190,7 @@ public abstract class Decoder {
   public abstract long readArrayStart() throws IOException;
 
   /**
-   * Processes the next block of an array andreturns the number of items in
+   * Processes the next block of an array and returns the number of items in
    * the block and let's the caller
    * read those items.
    * @throws AvroTypeException When called outside of an

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/DecoderFactory.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/DecoderFactory.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/DecoderFactory.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/DecoderFactory.java Fri Feb 25 00:36:40 2011
@@ -56,7 +56,7 @@ public class DecoderFactory {
    * 32000 bytes.
    * 
    * @param size The preferred buffer size. Valid values are in the range [32,
-   *          16*1024*1024]. Values outide this range are rounded to the nearest
+   *          16*1024*1024]. Values outside this range are rounded to the nearest
    *          value in the range. Values less than 512 or greater than 1024*1024
    *          are not recommended.
    * @return This factory, to enable method chaining:

Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java?rev=1074364&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java (added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java Fri Feb 25 00:36:40 2011
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An {@link Encoder} for Avro's binary encoding that does not buffer output.
+ * <p/>
+ * This encoder does not buffer writes, and as a result is slower than
+ * {@link BufferedBinaryEncoder}. However, it is lighter-weight and useful when the
+ * buffering in BufferedBinaryEncoder is not desired and/or the Encoder is
+ * very short lived.
+ * <p/>
+ * To construct, use
+ * {@link EncoderFactory#directBinaryEncoder(OutputStream, BinaryEncoder)}
+ *  <p/>
+ * DirectBinaryEncoder is not thread-safe
+ * @see BinaryEncoder
+ * @see EncoderFactory
+ * @see Encoder
+ * @see Decoder
+ */
+public class DirectBinaryEncoder extends BinaryEncoder {
+  private OutputStream out;
+  // the buffer is used for writing floats, doubles, and large longs.
+  private final byte[] buf = new byte[12];
+
+  /** Create a writer that sends its output to the underlying stream
+   *  <code>out</code>. 
+   **/
+  DirectBinaryEncoder(OutputStream out) {
+    configure(out);
+  }
+
+  DirectBinaryEncoder configure(OutputStream out) {
+    if (null == out) throw new NullPointerException("OutputStream cannot be null!");
+    this.out = out;
+    return this;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    out.flush();
+  }
+
+  @Override
+  public void writeBoolean(boolean b) throws IOException {
+    out.write(b ? 1 : 0);
+  }
+
+  /* buffering is slower for ints that encode to just 1 or
+   * two bytes, and and faster for large ones.
+   * (Sun JRE 1.6u22, x64 -server) */
+  @Override
+  public void writeInt(int n) throws IOException {
+    int val = (n << 1) ^ (n >> 31);
+    if ((val & ~0x7F) == 0) {
+      out.write(val);
+      return;
+    } else if ((val & ~0x3FFF) == 0) {
+      out.write(0x80 | val);
+      out.write(val >>> 7);
+      return;
+    }
+    int len = BinaryData.encodeInt(n, buf, 0);
+    out.write(buf, 0, len);
+  }
+
+  /* buffering is slower for writeLong when the number is small enough to
+   * fit in an int. 
+   * (Sun JRE 1.6u22, x64 -server) */
+  @Override
+  public void writeLong(long n) throws IOException {
+    long val = (n << 1) ^ (n >> 63); // move sign to low-order bit
+    if ((val & ~0x7FFFFFFFL) == 0) {
+      int i = (int) val;
+      while ((i & ~0x7F) != 0) {
+        out.write((byte)((0x80 | i) & 0xFF));
+        i >>>= 7;
+      }
+      out.write((byte)i);
+      return;
+    }
+    int len = BinaryData.encodeLong(n, buf, 0);
+    out.write(buf, 0, len);
+  }
+  
+  @Override
+  public void writeFloat(float f) throws IOException {
+    int len = BinaryData.encodeFloat(f, buf, 0);
+    out.write(buf, 0, len);
+  }
+
+  @Override
+  public void writeDouble(double d) throws IOException {
+    byte[] buf = new byte[8];
+    int len = BinaryData.encodeDouble(d, buf, 0);
+    out.write(buf, 0, len);
+  }
+
+  @Override
+  public void writeFixed(byte[] bytes, int start, int len) throws IOException {
+    out.write(bytes, start, len);
+  }
+
+  @Override
+  protected void writeZero() throws IOException {
+    out.write(0);
+  }
+  
+  @Override
+  public int bytesBuffered() {
+    return 0;
+  }
+
+}

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java Fri Feb 25 00:36:40 2011
@@ -19,7 +19,6 @@ package org.apache.avro.io;
 
 import java.io.Flushable;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 import org.apache.avro.util.Utf8;
@@ -44,9 +43,6 @@ import org.apache.avro.util.Utf8;
  */
 public abstract class Encoder implements Flushable {
 
-  /** Redirect output (and reset the parser state if we're checking). */
-  public abstract void init(OutputStream out) throws IOException;
-
   /**
    * "Writes" a null value.  (Doesn't actually write anything, but
    * advances the state of the parser if this class is stateful.)

Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java?rev=1074364&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java (added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java Fri Feb 25 00:36:40 2011
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.codehaus.jackson.JsonGenerator;
+
+/**
+ * A factory for creating and configuring {@link Encoders}s.
+ * <p/>
+ * Factory methods that create Encoder instances are thread-safe.
+ * Multiple instances with different configurations can be cached
+ * by an application.
+ * 
+ * @see Encoder
+ * @see BinaryEncoder
+ * @see JsonEncoder
+ * @see ValidatingEncoder
+ * @see BufferedBinaryEncoder
+ * @see BlockingBinaryEncoder
+ * @see DirectBinaryEncoder
+ */
+
+public class EncoderFactory {
+  private static final int DEFAULT_BUFFER_SIZE = 2048;
+  private static final int DEFAULT_BLOCK_BUFFER_SIZE = 64 * 1024;
+  private static final int MIN_BLOCK_BUFFER_SIZE = 64;
+  private static final int MAX_BLOCK_BUFFER_SIZE = 1024 * 1024 * 1024;
+
+  private static final EncoderFactory DEFAULT_FACTORY = 
+    new DefaultEncoderFactory();
+  
+  protected int binaryBufferSize = DEFAULT_BUFFER_SIZE;
+  protected int binaryBlockSize = DEFAULT_BLOCK_BUFFER_SIZE;
+
+  /**
+   * Returns an immutable static DecoderFactory with default configuration.
+   * All configuration methods throw AvroRuntimeExceptions if called.
+   */
+  public static EncoderFactory get() {
+    return DEFAULT_FACTORY;
+  }
+  
+  /**
+   * Configures this factory to use the specified buffer size when creating
+   * Encoder instances that buffer their output. The default buffer size is 2048
+   * bytes.
+   * 
+   * @param size
+   *          The buffer size to configure new instances with. Valid values are
+   *          in the range [32, 16*1024*1024]. Values outside this range are set
+   *          to the nearest value in the range. Values less than 256 will limit
+   *          performance but will consume less memory if the BinaryEncoder is
+   *          short-lived, values greater than 8*1024 are not likely to improve
+   *          performance but may be useful for the downstream OutputStream.
+   * @return This factory, to enable method chaining:
+   * <pre>
+   * EncoderFactory factory = new EncoderFactory().configureBufferSize(4096);
+   * </pre>
+   * @see #binaryEncoder(OutputStream, BinaryEncoder)
+   */
+  public EncoderFactory configureBufferSize(int size) {
+    if (size < 32)
+      size = 32;
+    if (size > 16 * 1024 * 1024)
+      size = 16 * 1024 * 1024;
+    this.binaryBufferSize = size;
+    return this;
+  }
+  
+  /**
+   * Returns this factory's configured default buffer size.  Used when creating
+   * Encoder instances that buffer writes.
+   * @see #configureBufferSize(int)
+   * @return The preferred buffer size, in bytes.
+   */
+  public int getBufferSize() {
+    return this.binaryBufferSize;
+  }
+
+  /**
+   * Configures this factory to construct blocking BinaryEncoders with the
+   * specified block buffer size. The default buffer size is 64 * 1024 bytes.
+   * 
+   * @param size
+   *          The preferred block size for array blocking. Arrays larger than
+   *          this size will be segmented into blocks according to the Avro
+   *          spec. Valid values are in the range [64, 1024*1024*1024] Values
+   *          outside this range are set to the nearest value in the range. The
+   *          encoder will require at least this amount of memory.
+   * @return This factory, to enable method chaining:
+   * <pre>
+   * EncoderFactory factory = new EncoderFactory().configureBlockSize(8000);
+   * </pre>
+   * @see #blockingBinaryEncoder(OutputStream, BinaryEncoder)
+   */
+  public EncoderFactory configureBlockSize(int size) {
+    if (size < MIN_BLOCK_BUFFER_SIZE) 
+      size = MIN_BLOCK_BUFFER_SIZE;
+    if (size > MAX_BLOCK_BUFFER_SIZE)
+      size = MAX_BLOCK_BUFFER_SIZE;
+    this.binaryBufferSize = size;
+    return this;
+  }
+
+  /**
+   * Returns this factory's configured default block buffer size.  
+   * {@link BlockingBinaryEncoder} instances created by this factory will
+   * have block buffers of this size.
+   * <p/>
+   * @see #configureBlockBufferSize
+   * @see BlockingBinaryEncoder
+   * @return The preferred buffer size, in bytes.
+   */
+  public int getBlockSize() {
+    return this.binaryBlockSize;
+  }
+  
+  /**
+   * Creates or reinitializes a {@link BinaryEncoder} to use the OutputStream
+   * provided as the destination for written data. If <i>reuse</i> is provided,
+   * an attempt will be made to reconfigure <i>reuse</i> rather than construct a
+   * new instance, but this is not guaranteed, a new instance may be returned.
+   * <p/>
+   * The {@link BinaryEncoder} implementation returned may buffer its output.
+   * Data may not appear on the underlying OutputStream until
+   * {@link Encoder.flush()} is called.  The buffer size is configured with
+   * {@link #configureBufferSize(int).
+   * </p>  If buffering is not desired, and lower performance is acceptable, use 
+   * {@link #directBinaryEncoder(OutputStream, BinaryEncoder)}
+   * <p/>
+   * {@link BinaryEncoder} instances returned by this method are not thread-safe
+   * 
+   * @param out
+   *          The OutputStream to write to.  Cannot be null.
+   * @param reuse
+   *          The BinaryEncoder to <i>attempt</i> to reuse given the factory
+   *          configuration. A BinaryEncoder implementation may not be
+   *          compatible with reuse, causing a new instance to be returned.
+   *          If null, a new instance is returned.
+   * @return A BinaryEncoder that uses <i>out</i> as its data output. If
+   *         <i>reuse</i> is null, this will be a new instance. If <i>reuse</i>
+   *         is not null, then the returned instance may be a new instance or
+   *         <i>reuse</i> reconfigured to use <i>out</>.
+   * @throws IOException 
+   * @see BufferedBinaryEncoder
+   * @see Encoder
+   */
+  public BinaryEncoder binaryEncoder(OutputStream out, BinaryEncoder reuse) {
+    if (null == reuse || !reuse.getClass().equals(BufferedBinaryEncoder.class)) {
+      return new BufferedBinaryEncoder(out, this.binaryBufferSize);
+    }  else {
+      return ((BufferedBinaryEncoder)reuse).configure(out, this.binaryBufferSize);
+    }
+  }
+
+  /**
+   * Creates or reinitializes a {@link BinaryEncoder} with the OutputStream
+   * provided as the destination for written data. If <i>reuse</i> is provided,
+   * an attempt will be made to reconfigure <i>reuse</i> rather than construct a
+   * new instance, but this is not guaranteed, a new instance may be returned.
+   * <p/>
+   * The {@link BinaryEncoder} implementation returned does not buffer its
+   * output, calling {@link Encoder.flush()} will simply cause the wrapped
+   * OutputStream to be flushed.
+   * <p/>
+   * Performance of unbuffered writes can be significantly slower than buffered
+   * writes.  {@link #binaryEncoder(OutputStream, BinaryEncoder)} returns
+   * BinaryEncoder instances that are tuned for performance but may buffer output.
+   * The unbuffered, 'direct' encoder may be desired when buffering semantics are
+   * problematic, or if the lifetime of the encoder is so short that the buffer
+   * would not be useful.
+   * <p/>
+   * {@link BinaryEncoder} instances returned by this method are not thread-safe.
+   * 
+   * @param out
+   *          The OutputStream to initialize to. Cannot be null.
+   * @param reuse
+   *          The BinaryEncoder to <i>attempt</i> to reuse given the factory
+   *          configuration. A BinaryEncoder implementation may not be
+   *          compatible with reuse, causing a new instance to be returned. If
+   *          null, a new instance is returned.
+   * @return A BinaryEncoder that uses <i>out</i> as its data output. If
+   *         <i>reuse</i> is null, this will be a new instance. If <i>reuse</i>
+   *         is not null, then the returned instance may be a new instance or
+   *         <i>reuse</i> reconfigured to use <i>out</>.
+   * @see DirectBinaryEncoder
+   * @see Encoder
+   */
+  public BinaryEncoder directBinaryEncoder(OutputStream out, BinaryEncoder reuse) {
+    if (null == reuse || !reuse.getClass().equals(DirectBinaryEncoder.class)) {
+      return new DirectBinaryEncoder(out);
+    } else {
+      return ((DirectBinaryEncoder)reuse).configure(out);
+    }
+  }
+  
+  /**
+   * Creates or reinitializes a {@link BinaryEncoder} with the OutputStream
+   * provided as the destination for written data. If <i>reuse</i> is provided,
+   * an attempt will be made to reconfigure <i>reuse</i> rather than construct a
+   * new instance, but this is not guaranteed, a new instance may be returned.
+   * <p/>
+   * The {@link BinaryEncoder} implementation returned buffers its output,
+   * calling {@link Encoder.flush()} is required for output to appear on the underlying
+   * OutputStream.
+   * <p/>
+   * The returned BinaryEncoder implements the Avro binary encoding using blocks
+   * delimited with byte sizes for Arrays and Maps.  This allows for some decoders
+   * to skip over large Arrays or Maps without decoding the contents, but adds
+   * some overhead.  The default block size is configured with
+   * {@link #configureBlockSize(int)} 
+   * <p/>
+   * {@link BinaryEncoder} instances returned by this method are not thread-safe.
+   * 
+   * @param out
+   *          The OutputStream to initialize to. Cannot be null.
+   * @param reuse
+   *          The BinaryEncoder to <i>attempt</i> to reuse given the factory
+   *          configuration. A BinaryEncoder implementation may not be
+   *          compatible with reuse, causing a new instance to be returned. If
+   *          null, a new instance is returned.
+   * @return A BinaryEncoder that uses <i>out</i> as its data output. If
+   *         <i>reuse</i> is null, this will be a new instance. If <i>reuse</i>
+   *         is not null, then the returned instance may be a new instance or
+   *         <i>reuse</i> reconfigured to use <i>out</>.
+   * @throws IOException
+   * @see BlockingBinaryEncoder
+   * @see Encoder
+   */
+  public BinaryEncoder blockingBinaryEncoder(OutputStream out, BinaryEncoder reuse) {
+    if (null == reuse || !reuse.getClass().equals(BlockingBinaryEncoder.class)) {
+      return new BlockingBinaryEncoder(out, this.binaryBlockSize, 32);
+    } else {
+      return ((BlockingBinaryEncoder)reuse).configure(out, this.binaryBlockSize, 32);
+    }
+  }
+
+  /**
+   * Creates a {@link JsonEncoder} using the OutputStream provided for writing
+   * data conforming to the Schema provided.
+   * <p/>
+   * {@link JsonEncoder} buffers its output. Data may not appear on the
+   * underlying OutputStream until {@link Encoder.flush()} is called.
+   * <p/>
+   * {@link JsonEncoder} is not thread-safe.
+   * 
+   * @param schema
+   *          The Schema for data written to this JsonEncoder. Cannot be null.
+   * @param out
+   *          The OutputStream to write to. Cannot be null.
+   * @return A JsonEncoder configured with <i>out</i> and <i>schema</i>
+   * @throws IOException
+   */
+  public JsonEncoder jsonEncoder(Schema schema, OutputStream out)
+      throws IOException {
+    return new JsonEncoder(schema, out);
+  }
+
+  /**
+   * Creates a {@link JsonEncoder} using the {@link JsonGenerator} provided for
+   * output of data conforming to the Schema provided.
+   * <p/>
+   * {@link JsonEncoder} buffers its output. Data may not appear on the
+   * underlying output until {@link Encoder.flush()} is called.
+   * <p/>
+   * {@link JsonEncoder} is not thread-safe.
+   * 
+   * @param schema
+   *          The Schema for data written to this JsonEncoder. Cannot be null.
+   * @param gen
+   *          The JsonGenerator to write with. Cannot be null.
+   * @return A JsonEncoder configured with <i>gen</i> and <i>schema</i>
+   * @throws IOException
+   */
+  public JsonEncoder jsonEncoder(Schema schema, JsonGenerator gen)
+      throws IOException {
+    return new JsonEncoder(schema, gen);
+  }
+  
+  /**
+   * Creates a {@link ValidatingEncoder} that wraps the Encoder provided.
+   * This ValidatingEncoder will ensure that operations against it conform
+   * to the schema provided.
+   * <p/>
+   * Many {@link Encoder}s buffer their output. Data may not appear on the
+   * underlying output until {@link Encoder.flush()} is called.
+   * <p/>
+   * {@link ValidatingEncoder} is not thread-safe.
+   * 
+   * @param schema
+   *          The Schema to validate operations against. Cannot be null.
+   * @param encoder
+   *          The Encoder to wrap.  Cannot be be null.
+   * @return A ValidatingEncoder configured to wrap <i>encoder</i> and validate
+   *  against <i>schema</i>
+   * @throws IOException
+   */
+  public ValidatingEncoder validatingEncoder(Schema schema, Encoder encoder)
+      throws IOException {
+    return new ValidatingEncoder(schema, encoder);
+  }
+  
+  // default encoder is not mutable
+  private static class DefaultEncoderFactory extends EncoderFactory {
+    @Override
+    public EncoderFactory configureBlockSize(int size) {
+      throw new AvroRuntimeException("Default EncoderFactory cannot be configured");
+    }
+    @Override
+    public EncoderFactory configureBufferSize(int size) {
+      throw new AvroRuntimeException("Default EncoderFactory cannot be configured");
+    }
+  }
+}

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java Fri Feb 25 00:36:40 2011
@@ -32,7 +32,15 @@ import org.codehaus.jackson.JsonEncoding
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerator;
 
-/** An {@link Encoder} for Avro's JSON data encoding. */
+/** An {@link Encoder} for Avro's JSON data encoding. 
+ * </p>
+ * Construct using {@link EncoderFactory}.
+ * </p>
+ * JsonEncoder buffers output, and data may not appear on the output
+ * until {@link Encoder#flush()} is called.
+ * </p>
+ * JsonEncoder is not thread-safe.
+ * */
 public class JsonEncoder extends ParsingEncoder implements Parser.ActionHandler {
   final Parser parser;
   private JsonGenerator out;
@@ -41,12 +49,12 @@ public class JsonEncoder extends Parsing
    */
   protected BitSet isEmpty = new BitSet();
 
-  public JsonEncoder(Schema sc, OutputStream out) throws IOException {
+  JsonEncoder(Schema sc, OutputStream out) throws IOException {
     this(sc, getJsonGenerator(out));
   }
 
-  public JsonEncoder(Schema sc, JsonGenerator out) throws IOException {
-    this.out = out;
+  JsonEncoder(Schema sc, JsonGenerator out) throws IOException {
+    configure(out);
     this.parser =
       new Parser(new JsonGrammarGenerator().generate(sc), this);
   }
@@ -59,16 +67,51 @@ public class JsonEncoder extends Parsing
     }
   }
 
-  @Override
-  public void init(OutputStream out) throws IOException {
-    flush();
-    this.out = getJsonGenerator(out);
-  }
-
   private static JsonGenerator getJsonGenerator(OutputStream out)
-    throws IOException {
-    return out == null ? null :
-      new JsonFactory().createJsonGenerator(out, JsonEncoding.UTF8);
+      throws IOException {
+    if (null == out)
+      throw new NullPointerException("OutputStream cannot be null"); 
+    return new JsonFactory().createJsonGenerator(out, JsonEncoding.UTF8);
+  }
+  
+  /**
+   * Reconfigures this JsonEncoder to use the output stream provided.
+   * <p/>
+   * If the OutputStream provided is null, a NullPointerException is thrown.
+   * <p/>
+   * Otherwise, this JsonEncoder will flush its current output and then
+   * reconfigure its output to use a default UTF8 JsonGenerator that writes
+   * to the provided OutputStream.
+   * 
+   * @param out
+   *          The OutputStream to direct output to. Cannot be null.
+   * @throws IOException
+   * @return
+   */
+  public JsonEncoder configure(OutputStream out) throws IOException {
+    this.configure(getJsonGenerator(out));
+    return this;
+  }
+  
+  /**
+   * Reconfigures this JsonEncoder to output to the JsonGenerator provided.
+   * <p/>
+   * If the JsonGenerator provided is null, a NullPointerException is thrown.
+   * <p/>
+   * Otherwise, this JsonEncoder will flush its current output and then
+   * reconfigure its output to use the provided JsonGenerator.
+   * 
+   * @param generator
+   *          The JsonGenerator to direct output to. Cannot be null.
+   * @throws IOException
+   */
+  public void configure(JsonGenerator generator) throws IOException {
+    if (null == generator)
+      throw new NullPointerException("JsonGenerator cannot be null");
+    if (null != parser) {
+      flush();
+    }
+    this.out = generator;
   }
 
   @Override

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingEncoder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingEncoder.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingEncoder.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingEncoder.java Fri Feb 25 00:36:40 2011
@@ -19,9 +19,7 @@
 package org.apache.avro.io;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.util.BitSet;
 
 import org.apache.avro.AvroTypeException;
 import org.apache.avro.Schema;
@@ -31,24 +29,27 @@ import org.apache.avro.io.parsing.Symbol
 import org.apache.avro.util.Utf8;
 
 /**
- * An implementation of {@link Encoder} that ensures that the sequence
- * of operations conforms to a schema.
+ * An implementation of {@link Encoder} that wraps another Encoder and
+ * ensures that the sequence of operations conforms to the provided schema.
+ * <p/>
+ * Use {@link EncoderFactory#validatingEncoder(Schema, Encoder)} to construct
+ * and configure.
+ * <p/>
+ * ValidatingEncoder is not thread-safe.
+ * @see Encoder
+ * @see EncoderFactory
  */
 public class ValidatingEncoder extends ParsingEncoder 
   implements Parser.ActionHandler {
-  protected final Encoder out;
+  protected Encoder out;
   protected final Parser parser;
-  /**
-   * Has anything been written into the collections?
-   */
-  protected BitSet isEmpty = new BitSet();
 
   ValidatingEncoder(Symbol root, Encoder out) throws IOException {
     this.out = out;
     this.parser = new Parser(root, this);
   }
 
-  public ValidatingEncoder(Schema schema, Encoder in) throws IOException {
+  ValidatingEncoder(Schema schema, Encoder in) throws IOException {
     this(new ValidatingGrammarGenerator().generate(schema), in);
   }
 
@@ -57,13 +58,16 @@ public class ValidatingEncoder extends P
     out.flush();
   }
 
-  @Override
-  public void init(OutputStream out) throws IOException {
-    flush();
-    parser.reset();
-    this.out.init(out);
+  /**
+   * Reconfigures this ValidatingEncoder to wrap the encoder provided.
+   * @param in
+   *   The Encoder to wrap for validation.
+   */
+  public void configure(Encoder encoder) {
+    this.parser.reset();
+    this.out = encoder;
   }
-
+  
   @Override
   public void writeNull() throws IOException {
     parser.advance(Symbol.NULL);
@@ -207,9 +211,5 @@ public class ValidatingEncoder extends P
     return null;
   }
 
-  /** Have we written at least one item into the current collection? */
-  protected final boolean isTopEmpty() {
-    return isEmpty.get(pos);
-  }
 }
 

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java Fri Feb 25 00:36:40 2011
@@ -28,8 +28,8 @@ import java.util.Map;
 import org.apache.avro.AvroTypeException;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
-import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
 import org.codehaus.jackson.JsonNode;
 
 /**
@@ -278,6 +278,7 @@ public class ResolvingGrammarGenerator e
     return result;
   }
 
+  private static EncoderFactory factory = new EncoderFactory().configureBufferSize(32);
   /**
    * Returns the Avro binary encoded version of <tt>n</tt> according to
    * the schema <tt>s</tt>.
@@ -288,8 +289,9 @@ public class ResolvingGrammarGenerator e
    */
   private static byte[] getBinary(Schema s, JsonNode n) throws IOException {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
-    Encoder e = new BinaryEncoder(out);
+    Encoder e = factory.binaryEncoder(out, null);
     encode(e, s, n);
+    e.flush();
     return out.toByteArray();
   }
   

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java Fri Feb 25 00:36:40 2011
@@ -18,7 +18,6 @@
 package org.apache.avro.util;
 
 import java.io.UnsupportedEncodingException;
-import java.nio.charset.Charset;
 
 import org.apache.avro.io.BinaryData;
 
@@ -28,10 +27,6 @@ import org.apache.avro.io.BinaryData;
 public class Utf8 implements Comparable<Utf8>, CharSequence {
   private static final byte[] EMPTY = new byte[0];
 
-  private static final Charset UTF8_CS;
-  static {
-    UTF8_CS = Charset.forName("UTF-8");
-  }
   private byte[] bytes = EMPTY;
   private int length;
   private String string;
@@ -39,7 +34,7 @@ public class Utf8 implements Comparable<
   public Utf8() {}
 
   public Utf8(String string) {
-    this.bytes = string.getBytes(UTF8_CS);
+    this.bytes = getBytesFor(string);
     this.length = bytes.length;
     this.string = string;
   }
@@ -112,6 +107,7 @@ public class Utf8 implements Comparable<
     return hash;
   }
 
+  @Override
   public int compareTo(Utf8 that) {
     return BinaryData.compareBytes(this.bytes, 0, this.length,
                                    that.bytes, 0, that.length);
@@ -125,8 +121,12 @@ public class Utf8 implements Comparable<
   }
 
   /** Gets the UTF-8 bytes for a String */
-  public static byte[] getBytesFor(String str) {
-    return str.getBytes(UTF8_CS);
+  public static final byte[] getBytesFor(String str) {
+    try {
+      return str.getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
   }
 
 }

Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/GenerateBlockingData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/GenerateBlockingData.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/GenerateBlockingData.java (original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/GenerateBlockingData.java Fri Feb 25 00:36:40 2011
@@ -18,10 +18,10 @@
 package org.apache.avro;
 
 import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.BlockingBinaryEncoder;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -40,7 +40,9 @@ public class GenerateBlockingData {
   private static ByteArrayOutputStream buffer =
                       new ByteArrayOutputStream(2*SYNC_INTERVAL);
   
-  private static Encoder bufOut = new BlockingBinaryEncoder(buffer);
+  private static EncoderFactory factory = EncoderFactory.get();
+  private static Encoder bufOut = EncoderFactory.get().blockingBinaryEncoder(
+      buffer, null);
   private static int blockCount;
 
   private static void writeBlock(Encoder vout, FileOutputStream out)
@@ -66,7 +68,7 @@ public class GenerateBlockingData {
     FileOutputStream out = new FileOutputStream(outputFile, false);
     DatumWriter<Object> dout = new GenericDatumWriter<Object>();
     dout.setSchema(sch);
-    Encoder vout = new BinaryEncoder(out);
+    Encoder vout = factory.directBinaryEncoder(out, null);
     vout.writeLong(numObjects); // metadata:the count of objects in the file
     
     for (Object datum : new RandomData(sch, numObjects)) {

Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java (original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java Fri Feb 25 00:36:40 2011
@@ -35,8 +35,9 @@ import java.util.Map;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.TestReflect.SampleRecord.AnotherSampleRecord;
 import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.Decoder;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
@@ -48,7 +49,9 @@ import org.apache.avro.reflect.Union;
 import org.junit.Test;
 
 public class TestReflect {
-
+  
+  EncoderFactory factory = new EncoderFactory();
+  
   // test primitive type inference
   @Test public void testVoid() {
     check(Void.TYPE, "\"null\"");
@@ -355,7 +358,7 @@ public class TestReflect {
   void checkReadWrite(Object object, Schema s) throws Exception {
     ReflectDatumWriter<Object> writer = new ReflectDatumWriter<Object>(s);
     ByteArrayOutputStream out = new ByteArrayOutputStream();
-    writer.write(object, new BinaryEncoder(out));
+    writer.write(object, factory.directBinaryEncoder(out, null));
     ReflectDatumReader<Object> reader = new ReflectDatumReader<Object>(s);
     Object after =
       reader.read(null, DecoderFactory.defaultFactory().createBinaryDecoder(
@@ -394,7 +397,7 @@ public class TestReflect {
     SampleRecord record = new SampleRecord();
     record.x = 5;
     record.y = 10;
-    writer.write(record, new BinaryEncoder(out));
+    writer.write(record, factory.directBinaryEncoder(out, null));
     ReflectDatumReader<SampleRecord> reader = 
       new ReflectDatumReader<SampleRecord>(schm);
     SampleRecord decoded =
@@ -411,7 +414,7 @@ public class TestReflect {
       new ReflectDatumWriter<AnotherSampleRecord>(schm);
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     // keep record.a null and see if that works
-    BinaryEncoder e = new BinaryEncoder(out);
+    Encoder e = factory.directBinaryEncoder(out, null);
     AnotherSampleRecord a = new AnotherSampleRecord();
     writer.write(a, e);
     AnotherSampleRecord b = new AnotherSampleRecord(10);
@@ -551,7 +554,7 @@ public class TestReflect {
     throws IOException {
     ReflectDatumWriter<Object> writer = new ReflectDatumWriter<Object>(schema);
     ByteArrayOutputStream out = new ByteArrayOutputStream();
-    writer.write(datum, new BinaryEncoder(out));
+    writer.write(datum, EncoderFactory.get().directBinaryEncoder(out, null));
     byte[] data = out.toByteArray();
 
     ReflectDatumReader<Object> reader = new ReflectDatumReader<Object>(schema);

Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericData.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericData.java (original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericData.java Fri Feb 25 00:36:40 2011
@@ -34,6 +34,7 @@ import org.apache.avro.AvroRuntimeExcept
 import org.apache.avro.Schema.Type;
 import org.apache.avro.io.BinaryData;
 import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.util.Utf8;
 import org.codehaus.jackson.JsonFactory;
@@ -240,8 +241,8 @@ public class TestGenericData {
     
     ByteArrayOutputStream b1 = new ByteArrayOutputStream(5);
     ByteArrayOutputStream b2 = new ByteArrayOutputStream(5);
-    BinaryEncoder b1Enc = new BinaryEncoder(b1);
-    BinaryEncoder b2Enc = new BinaryEncoder(b2);
+    BinaryEncoder b1Enc = EncoderFactory.get().binaryEncoder(b1, null);
+    BinaryEncoder b2Enc = EncoderFactory.get().binaryEncoder(b2, null);
     // Prepare two different datums
     Record testDatum1 = new Record(record);
     testDatum1.put(0, 1);

Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericDatumWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericDatumWriter.java?rev=1074364&r1=1074363&r2=1074364&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericDatumWriter.java (original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericDatumWriter.java Fri Feb 25 00:36:40 2011
@@ -25,8 +25,8 @@ import java.io.IOException;
 
 import org.apache.avro.Schema;
 import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.io.JsonDecoder;
-import org.apache.avro.io.JsonEncoder;
 import org.junit.Test;
 
 public class TestGenericDatumWriter {
@@ -41,7 +41,7 @@ public class TestGenericDatumWriter {
     ByteArrayOutputStream bao = new ByteArrayOutputStream();
     GenericDatumWriter<GenericRecord> w =
       new GenericDatumWriter<GenericRecord>(s);
-    Encoder e = new JsonEncoder(s, bao);
+    Encoder e = EncoderFactory.get().jsonEncoder(s, bao);
     w.write(r, e);
     e.flush();
     



Mime
View raw message