avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject svn commit: r1681906 [1/2] - in /avro/trunk: ./ lang/java/avro/src/main/java/org/apache/avro/ lang/java/avro/src/main/java/org/apache/avro/generic/ lang/java/avro/src/main/java/org/apache/avro/reflect/ lang/java/avro/src/test/java/org/apache/avro/ lang...
Date Wed, 27 May 2015 04:48:59 GMT
Author: blue
Date: Wed May 27 04:48:58 2015
New Revision: 1681906

URL: http://svn.apache.org/r1681906
Log:
AVRO-1497: Add LogicalType implementation.

This closes PR #29 and includes the following commits:

ca1d2b1 AVRO-1497: Add LogicalTypes and read-side implementation.
442a917 AVRO-1497: Add logical type support to schema reflection.
ec8d6d4 AVRO-1497: Add logical type writes to generic and reflect.
e6e9761 AVRO-1497: Clean up Conversion and LogicalType classes.
8fe954a AVRO-1497: Add Conversion and LogicalType javadoc.
3abf042 AVRO-1497: Fix ByteBuffer bug in DecimalConversion.
2293a18 AVRO-1497: Fix review items.
207afd3 AVRO-1497: Add logical type registration and record test.
d2377b2 AVRO-1497: Maven CLI and checkstyle fixes.
1e628b2 AVRO-1497: Fix test failures.
15ed857 AVRO-1497: Fix performance issues with logical types.
fb84364 AVRO-1497: Check logical type once per array.
7de6edc AVRO-1497: Remove unnecessary changes to Schema.

Added:
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversion.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversions.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalType.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestLogicalType.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/TestReflectLogicalTypes.java
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Schema.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1681906&r1=1681905&r2=1681906&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed May 27 04:48:58 2015
@@ -42,6 +42,8 @@ Trunk (not yet released)
 
     AVRO-680. Java: Support non-string map keys. (Sachin Goyal via Ryan Blue).
 
+    AVRO-1497. Java: Add support for logical types. (blue)
+
   OPTIMIZATIONS
 
   IMPROVEMENTS

Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversion.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversion.java?rev=1681906&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversion.java (added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversion.java Wed May 27 04:48:58 2015
@@ -0,0 +1,171 @@
+package org.apache.avro;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
+
+/**
+ * Conversion between generic and logical type instances.
+ * <p>
+ * Instances of this class are added to GenericData to convert a logical type
+ * to a particular representation.
+ * <p>
+ * Implementations must provide:
+ * * {@link #getConvertedType()}: get the Java class used for the logical type
+ * * {@link #getLogicalTypeName()}: get the logical type this implements
+ * <p>
+ * Subclasses must also override all of the conversion methods for Avro's base
+ * types that are valid for the logical type, or else risk causing
+ * {@code UnsupportedOperationException} at runtime.
+ * <p>
+ * Optionally, use {@link #getRecommendedSchema()} to provide a Schema that
+ * will be used when a Schema is generated for the class returned by
+ * {@code getConvertedType}.
+ *
+ * @param <T> a Java type that generic data is converted to
+ */
+public abstract class Conversion<T> {
+
+  /**
+   * Return the Java class representing the logical type.
+   *
+   * @return a Java class returned by from methods and accepted by to methods
+   */
+  public abstract Class<T> getConvertedType();
+
+  /**
+   * Return the logical type this class converts.
+   *
+   * @return a String logical type name
+   */
+  public abstract String getLogicalTypeName();
+
+  public Schema getRecommendedSchema() {
+    throw new UnsupportedOperationException(
+        "No recommended schema for " + getLogicalTypeName());
+  }
+
+  public T fromBoolean(Boolean value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "fromBoolean is not supported for " + type.getName());
+  }
+
+  public T fromInt(Integer value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "fromInt is not supported for " + type.getName());
+  }
+
+  public T fromLong(Long value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "fromLong is not supported for " + type.getName());
+  }
+
+  public T fromFloat(Float value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "fromFloat is not supported for " + type.getName());
+  }
+
+  public T fromDouble(Double value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "fromDouble is not supported for " + type.getName());
+  }
+
+  public T fromCharSequence(CharSequence value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "fromCharSequence is not supported for " + type.getName());
+  }
+
+  public T fromEnumSymbol(GenericEnumSymbol value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "fromEnumSymbol is not supported for " + type.getName());
+  }
+
+  public T fromFixed(GenericFixed value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "fromFixed is not supported for " + type.getName());
+  }
+
+  public T fromBytes(ByteBuffer value, Schema schema, LogicalType type)  {
+    throw new UnsupportedOperationException(
+        "fromBytes is not supported for " + type.getName());
+  }
+
+  public T fromArray(Collection<?> value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "fromArray is not supported for " + type.getName());
+  }
+
+  public T fromMap(Map<?, ?> value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "fromMap is not supported for " + type.getName());
+  }
+
+  public T fromRecord(IndexedRecord value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "fromRecord is not supported for " + type.getName());
+  }
+
+  public Boolean toBoolean(T value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "toBoolean is not supported for " + type.getName());
+  }
+
+  public Integer toInt(T value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "toInt is not supported for " + type.getName());
+  }
+
+  public Long toLong(T value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "toLong is not supported for " + type.getName());
+  }
+
+  public Float toFloat(T value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "toFloat is not supported for " + type.getName());
+  }
+
+  public Double toDouble(T value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "toDouble is not supported for " + type.getName());
+  }
+
+  public CharSequence toCharSequence(T value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "toCharSequence is not supported for " + type.getName());
+  }
+
+  public GenericEnumSymbol toEnumSymbol(T value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "toEnumSymbol is not supported for " + type.getName());
+  }
+
+  public GenericFixed toFixed(T value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "toFixed is not supported for " + type.getName());
+  }
+
+  public ByteBuffer toBytes(T value, Schema schema, LogicalType type)  {
+    throw new UnsupportedOperationException(
+        "toBytes is not supported for " + type.getName());
+  }
+
+  public Collection<?> toArray(T value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "toArray is not supported for " + type.getName());
+  }
+
+  public Map<?, ?> toMap(T value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "toMap is not supported for " + type.getName());
+  }
+
+  public IndexedRecord toRecord(T value, Schema schema, LogicalType type) {
+    throw new UnsupportedOperationException(
+        "toRecord is not supported for " + type.getName());
+  }
+
+}

Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversions.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversions.java?rev=1681906&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversions.java (added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversions.java Wed May 27 04:48:58 2015
@@ -0,0 +1,105 @@
+package org.apache.avro;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+
+public class Conversions {
+
+  public static class UUIDConversion extends Conversion<UUID> {
+    @Override
+    public Class<UUID> getConvertedType() {
+      return UUID.class;
+    }
+
+    @Override
+    public Schema getRecommendedSchema() {
+      return LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING));
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+      return "uuid";
+    }
+
+    @Override
+    public UUID fromCharSequence(CharSequence value, Schema schema, LogicalType type) {
+      return UUID.fromString(value.toString());
+    }
+
+    @Override
+    public CharSequence toCharSequence(UUID value, Schema schema, LogicalType type) {
+      return value.toString();
+    }
+  }
+
+  public static class DecimalConversion extends Conversion<BigDecimal> {
+    @Override
+    public Class<BigDecimal> getConvertedType() {
+      return BigDecimal.class;
+    }
+
+    @Override
+    public Schema getRecommendedSchema() {
+      throw new UnsupportedOperationException(
+          "No recommended schema for decimal (scale is required)");
+    }
+
+    @Override
+    public String getLogicalTypeName() {
+      return "decimal";
+    }
+
+    @Override
+    public BigDecimal fromBytes(ByteBuffer value, Schema schema, LogicalType type) {
+      int scale = ((LogicalTypes.Decimal) type).getScale();
+      // always copy the bytes out because BigInteger has no offset/length ctor
+      byte[] bytes = value.get(new byte[value.remaining()]).array();
+      return new BigDecimal(new BigInteger(bytes), scale);
+    }
+
+    @Override
+    public ByteBuffer toBytes(BigDecimal value, Schema schema, LogicalType type) {
+      int scale = ((LogicalTypes.Decimal) type).getScale();
+      if (scale != value.scale()) {
+        throw new AvroTypeException("Cannot encode decimal with scale " +
+            value.scale() + " as scale " + scale);
+      }
+      return ByteBuffer.wrap(value.unscaledValue().toByteArray());
+    }
+
+    @Override
+    public BigDecimal fromFixed(GenericFixed value, Schema schema, LogicalType type) {
+      int scale = ((LogicalTypes.Decimal) type).getScale();
+      return new BigDecimal(new BigInteger(value.bytes()), scale);
+    }
+
+    @Override
+    public GenericFixed toFixed(BigDecimal value, Schema schema, LogicalType type) {
+      int scale = ((LogicalTypes.Decimal) type).getScale();
+      if (scale != value.scale()) {
+        throw new AvroTypeException("Cannot encode decimal with scale " +
+            value.scale() + " as scale " + scale);
+      }
+
+      byte fillByte = (byte) (value.signum() < 0 ? 0xFF : 0x00);
+      byte[] unscaled = value.unscaledValue().toByteArray();
+      byte[] bytes = new byte[schema.getFixedSize()];
+      int offset = bytes.length - unscaled.length;
+
+      for (int i = 0; i < bytes.length; i += 1) {
+        if (i < offset) {
+          bytes[i] = fillByte;
+        } else {
+          bytes[i] = unscaled[i - offset];
+        }
+      }
+
+      return new GenericData.Fixed(schema, bytes);
+    }
+  }
+
+}

Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalType.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalType.java?rev=1681906&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalType.java (added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalType.java Wed May 27 04:48:58 2015
@@ -0,0 +1,77 @@
+package org.apache.avro;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.specific.SpecificData;
+
+/**
+ * Logical types provides an opt-in way to extend Avro's types. Logical types
+ * specify a way of representing a high-level type as a base Avro type. For
+ * example, a date is specified as the number of days after the unix epoch (or
+ * before using a negative value). This enables extentions to Avro's type
+ * system without breaking binary compatibility. Older versions see the base
+ * type and ignore the logical type.
+ */
+public class LogicalType {
+
+  public static final String LOGICAL_TYPE_PROP = "logicalType";
+
+  private static final String[] INCOMPATIBLE_PROPS = new String[] {
+      GenericData.STRING_PROP, SpecificData.CLASS_PROP,
+      SpecificData.KEY_CLASS_PROP, SpecificData.ELEMENT_PROP
+  };
+
+  private final String name;
+
+  public LogicalType(String logicalTypeName) {
+    this.name = logicalTypeName.intern();
+  }
+
+  /**
+   * Get the name of this logical type.
+   * <p>
+   * This name is set as the Schema property "logicalType".
+   *
+   * @return the String name of the logical type
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Add this logical type to the given Schema.
+   * <p>
+   * The "logicalType" property will be set to this type's name, and other
+   * type-specific properties may be added. The Schema is first validated to
+   * ensure it is compatible.
+   *
+   * @param schema a Schema
+   * @return the modified Schema
+   * @throws IllegalArgumentException if the type and schema are incompatible
+   */
+  public Schema addToSchema(Schema schema) {
+    validate(schema);
+    schema.addProp(LOGICAL_TYPE_PROP, name);
+    schema.setLogicalType(this);
+    return schema;
+  }
+
+  /**
+   * Validate this logical type for the given Schema.
+   * <p>
+   * This will throw an exception if the Schema is incompatible with this type.
+   * For example, a date is stored as an int and is incompatible with a fixed
+   * Schema.
+   *
+   * @param schema a Schema
+   * @throws IllegalArgumentException if the type and schema are incompatible
+   */
+  public void validate(Schema schema) {
+    for (String incompatible : INCOMPATIBLE_PROPS) {
+      if (schema.getProp(incompatible) != null) {
+        throw new IllegalArgumentException(
+            LOGICAL_TYPE_PROP + " cannot be used with " + incompatible);
+      }
+    }
+  }
+
+}

Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java?rev=1681906&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java (added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java Wed May 27 04:48:58 2015
@@ -0,0 +1,218 @@
+package org.apache.avro;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.avro.util.WeakIdentityHashMap;
+
+public class LogicalTypes {
+
+  private static final Map<Schema, LogicalType> CACHE =
+      new WeakIdentityHashMap<Schema, LogicalType>();
+
+  public interface LogicalTypeFactory {
+    LogicalType fromSchema(Schema schema);
+  }
+
+  private static final Map<String, LogicalTypeFactory> REGISTERED_TYPES =
+      new ConcurrentHashMap<String, LogicalTypeFactory>();
+
+  public static void register(String logicalTypeName, LogicalTypeFactory factory) {
+    if (logicalTypeName == null) {
+      throw new NullPointerException("Invalid logical type name: null");
+    }
+    if (factory == null) {
+      throw new NullPointerException("Invalid logical type factory: null");
+    }
+    REGISTERED_TYPES.put(logicalTypeName, factory);
+  }
+
+  /**
+   * Returns the {@link LogicalType} from the schema, if one is present.
+   * @param schema
+   * @return
+   */
+  public static LogicalType fromSchema(Schema schema) {
+    return fromSchemaImpl(schema, true);
+  }
+
+  public static LogicalType fromSchemaIgnoreInvalid(Schema schema) {
+    if (CACHE.containsKey(schema)) {
+      return CACHE.get(schema);
+    }
+
+    LogicalType logicalType = fromSchemaImpl(schema, false);
+
+    // add to the cache, even if it is null
+    CACHE.put(schema, logicalType);
+
+    return logicalType;
+  }
+
+  private static LogicalType fromSchemaImpl(Schema schema, boolean throwErrors) {
+    String typeName = schema.getProp(LogicalType.LOGICAL_TYPE_PROP);
+
+    LogicalType logicalType;
+    try {
+      if ("decimal".equals(typeName)) {
+        logicalType = new Decimal(schema);
+      } else if ("uuid".equals(typeName)) {
+        logicalType = UUID_TYPE;
+      } else if (REGISTERED_TYPES.containsKey(typeName)) {
+        logicalType = REGISTERED_TYPES.get(typeName).fromSchema(schema);
+      } else {
+        logicalType = null;
+      }
+
+      // make sure the type is valid before returning it
+      if (logicalType != null) {
+        logicalType.validate(schema);
+      }
+    } catch (RuntimeException e) {
+      if (throwErrors) {
+        throw e;
+      }
+      // ignore invalid types
+      logicalType = null;
+    }
+
+    return logicalType;
+  }
+
+  /** Create a Decimal LogicalType with the given precision and scale 0 */
+  public static Decimal decimal(int precision) {
+    return decimal(precision, 0);
+  }
+
+  /** Create a Decimal LogicalType with the given precision and scale */
+  public static Decimal decimal(int precision, int scale) {
+    return new Decimal(precision, scale);
+  }
+
+  private static final LogicalType UUID_TYPE = new LogicalType("uuid");
+
+  public static LogicalType uuid() {
+    return UUID_TYPE;
+  }
+
+  /** Decimal represents arbitrary-precision fixed-scale decimal numbers  */
+  public static class Decimal extends LogicalType {
+    private static final String PRECISION_PROP = "precision";
+    private static final String SCALE_PROP = "scale";
+
+    private final int precision;
+    private final int scale;
+
+    private Decimal(int precision, int scale) {
+      super("decimal");
+      this.precision = precision;
+      this.scale = scale;
+    }
+
+    private Decimal(Schema schema) {
+      super("decimal");
+      if (!hasProperty(schema, PRECISION_PROP)) {
+        throw new IllegalArgumentException(
+            "Invalid decimal: missing precision");
+      }
+
+      this.precision = getInt(schema, PRECISION_PROP);
+
+      if (hasProperty(schema, SCALE_PROP)) {
+        this.scale = getInt(schema, SCALE_PROP);
+      } else {
+        this.scale = 0;
+      }
+    }
+
+    @Override
+    public Schema addToSchema(Schema schema) {
+      super.addToSchema(schema);
+      schema.addProp(PRECISION_PROP, precision);
+      schema.addProp(SCALE_PROP, scale);
+      return schema;
+    }
+
+    public int getPrecision() {
+      return precision;
+    }
+
+    public int getScale() {
+      return scale;
+    }
+
+    @Override
+    public void validate(Schema schema) {
+      super.validate(schema);
+      // validate the type
+      if (schema.getType() != Schema.Type.FIXED &&
+          schema.getType() != Schema.Type.BYTES) {
+        throw new IllegalArgumentException(
+            "Logical type decimal must be backed by fixed or bytes");
+      }
+      if (precision <= 0) {
+        throw new IllegalArgumentException("Invalid decimal precision: " +
+            precision + " (must be positive)");
+      } else if (precision > maxPrecision(schema)) {
+        throw new IllegalArgumentException(
+            "fixed(" + schema.getFixedSize() + ") cannot store " +
+                precision + " digits (max " + maxPrecision(schema) + ")");
+      }
+      if (scale < 0) {
+        throw new IllegalArgumentException("Invalid decimal scale: " +
+            scale + " (must be positive)");
+      } else if (scale > precision) {
+        throw new IllegalArgumentException("Invalid decimal scale: " +
+            scale + " (greater than precision: " + precision + ")");
+      }
+    }
+
+    private long maxPrecision(Schema schema) {
+      if (schema.getType() == Schema.Type.BYTES) {
+        // not bounded
+        return Integer.MAX_VALUE;
+      } else if (schema.getType() == Schema.Type.FIXED) {
+        int size = schema.getFixedSize();
+        return Math.round(          // convert double to long
+            Math.floor(Math.log10(  // number of base-10 digits
+                Math.pow(2, 8 * size - 1) - 1)  // max value stored
+            ));
+      } else {
+        // not valid for any other type
+        return 0;
+      }
+    }
+
+    private boolean hasProperty(Schema schema, String name) {
+      return (schema.getObjectProp(name) != null);
+    }
+
+    private int getInt(Schema schema, String name) {
+      Object obj = schema.getObjectProp(name);
+      if (obj instanceof Integer) {
+        return (Integer) obj;
+      }
+      throw new IllegalArgumentException("Expected int " + name + ": " +
+          (obj == null ? "null" : obj + ":" + obj.getClass().getSimpleName()));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      Decimal decimal = (Decimal) o;
+
+      if (precision != decimal.precision) return false;
+      if (scale != decimal.scale) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = precision;
+      result = 31 * result + scale;
+      return result;
+    }
+  }
+}

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Schema.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Schema.java?rev=1681906&r1=1681905&r2=1681906&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Schema.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Schema.java Wed May 27 04:48:58 2015
@@ -98,6 +98,7 @@ public abstract class Schema extends Jso
   };
 
   private final Type type;
+  private LogicalType logicalType = null;
 
   Schema(Type type) {
     super(SCHEMA_RESERVED);
@@ -138,6 +139,14 @@ public abstract class Schema extends Jso
     hashCode = NO_HASHCODE;
   }
 
+  public LogicalType getLogicalType() {
+    return logicalType;
+  }
+
+  void setLogicalType(LogicalType logicalType) {
+    this.logicalType = logicalType;
+  }
+
   /** Create an anonymous record schema. */
   public static Schema createRecord(List<Field> fields) {
     Schema result = createRecord(null, null, null, false);
@@ -1310,6 +1319,8 @@ public abstract class Schema extends Jso
         if (!SCHEMA_RESERVED.contains(prop))      // ignore reserved
           result.addProp(prop, schema.get(prop));
       }
+      // parse logical type if present
+      result.logicalType = LogicalTypes.fromSchemaIgnoreInvalid(result);
       names.space(savedSpace);                  // restore space
       if (result instanceof NamedSchema) {
         Set<String> aliases = parseAliases(schema);

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java?rev=1681906&r1=1681905&r2=1681906&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java Wed May 27 04:48:58 2015
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.WeakHashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -32,6 +33,8 @@ import java.util.Map;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.AvroTypeException;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
@@ -59,7 +62,7 @@ public class GenericData {
   /** Used to specify the Java type for a string schema. */
   public enum StringType { CharSequence, String, Utf8 };
 
-  protected static final String STRING_PROP = "avro.java.string";
+  public static final String STRING_PROP = "avro.java.string";
   protected static final String STRING_TYPE_STRING = "String";
 
   private final ClassLoader classLoader;
@@ -91,6 +94,47 @@ public class GenericData {
   /** Return the class loader that's used (by subclasses). */
   public ClassLoader getClassLoader() { return classLoader; }
 
+  public Map<String, Conversion<?>> conversions =
+      new HashMap<String, Conversion<?>>();
+
+  public Map<Class<?>, Conversion<?>> conversionsByClass =
+      new IdentityHashMap<Class<?>, Conversion<?>>();
+
+  public void addLogicalTypeConversion(Conversion<?> conversion) {
+    conversions.put(conversion.getLogicalTypeName(), conversion);
+    conversionsByClass.put(conversion.getConvertedType(), conversion);
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> Conversion<? super T> getConversionFrom(Class<T> datumClass,
+                                                     LogicalType logicalType) {
+    Conversion<?> conversion = conversionsByClass.get(datumClass);
+    if (conversion != null &&
+        conversion.getLogicalTypeName().equals(logicalType.getName())) {
+      return (Conversion<T>) conversion;
+    }
+    return null;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> Conversion<? extends T> getConversionTo(Class<T> datumClass,
+                                                     LogicalType logicalType) {
+    Conversion<?> conversion = conversionsByClass.get(datumClass);
+    if (conversion != null &&
+        conversion.getLogicalTypeName().equals(logicalType.getName())) {
+      return (Conversion<T>) conversion;
+    }
+    return null;
+  }
+
+  @SuppressWarnings("unchecked")
+  public Conversion<Object> getConversionFor(LogicalType logicalType) {
+    if (logicalType == null) {
+      return null;
+    }
+    return (Conversion<Object>) conversions.get(logicalType.getName());
+  }
+
   /** Default implementation of {@link GenericRecord}. Note that this implementation
    * does not fill in default values for fields if they are not specified; use {@link
    * GenericRecordBuilder} in that case.
@@ -609,6 +653,24 @@ public class GenericData {
   /** Return the index for a datum within a union.  Implemented with {@link
    * Schema#getIndexNamed(String)} and {@link #getSchemaName(Object)}.*/
   public int resolveUnion(Schema union, Object datum) {
+    // if there is a logical type that works, use it first
+    // this allows logical type concrete classes to overlap with supported ones
+    // for example, a conversion could return a map
+    if (datum != null) {
+      Conversion<?> conversion = conversionsByClass.get(datum.getClass());
+      if (conversion != null) {
+        String logicalTypeName = conversion.getLogicalTypeName();
+        List<Schema> candidates = union.getTypes();
+        for (int i = 0; i < candidates.size(); i += 1) {
+          LogicalType candidateLogicalType = candidates.get(i).getLogicalType();
+          if (candidateLogicalType != null &&
+              logicalTypeName.equals(candidateLogicalType.getName())) {
+            return i;
+          }
+        }
+      }
+    }
+
     Integer i = union.getIndexNamed(getSchemaName(datum));
     if (i != null)
       return i;

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java?rev=1681906&r1=1681905&r2=1681906&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java Wed May 27 04:48:58 2015
@@ -27,6 +27,8 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 
 import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.io.DatumReader;
@@ -147,6 +149,27 @@ public class GenericDatumReader<D> imple
   /** Called to read data.*/
   protected Object read(Object old, Schema expected,
       ResolvingDecoder in) throws IOException {
+    Object datum = readWithoutConversion(old, expected, in);
+    LogicalType logicalType = expected.getLogicalType();
+    if (logicalType != null) {
+      Conversion<?> conversion = getData().getConversionFor(logicalType);
+      if (conversion != null) {
+        return convert(datum, expected, logicalType, conversion);
+      }
+    }
+    return datum;
+  }
+
+  protected Object readWithConversion(Object old, Schema expected,
+                                      LogicalType logicalType,
+                                      Conversion<?> conversion,
+                                      ResolvingDecoder in) throws IOException {
+    return convert(readWithoutConversion(old, expected, in),
+        expected, logicalType, conversion);
+  }
+
+  protected Object readWithoutConversion(Object old, Schema expected,
+      ResolvingDecoder in) throws IOException {
     switch (expected.getType()) {
     case RECORD:  return readRecord(old, expected, in);
     case ENUM:    return readEnum(expected, in);
@@ -165,7 +188,31 @@ public class GenericDatumReader<D> imple
     default: throw new AvroRuntimeException("Unknown type: " + expected);
     }
   }
-  
+
+  protected Object convert(Object datum, Schema schema, LogicalType type,
+                           Conversion<?> conversion) {
+    try {
+      switch (schema.getType()) {
+      case RECORD:  return conversion.fromRecord((IndexedRecord) datum, schema, type);
+      case ENUM:    return conversion.fromEnumSymbol((GenericEnumSymbol) datum, schema, type);
+      case ARRAY:   return conversion.fromArray(getData().getArrayAsCollection(datum), schema, type);
+      case MAP:     return conversion.fromMap((Map<?, ?>) datum, schema, type);
+      case FIXED:   return conversion.fromFixed((GenericFixed) datum, schema, type);
+      case STRING:  return conversion.fromCharSequence((CharSequence) datum, schema, type);
+      case BYTES:   return conversion.fromBytes((ByteBuffer) datum, schema, type);
+      case INT:     return conversion.fromInt((Integer) datum, schema, type);
+      case LONG:    return conversion.fromLong((Long) datum, schema, type);
+      case FLOAT:   return conversion.fromFloat((Float) datum, schema, type);
+      case DOUBLE:  return conversion.fromDouble((Double) datum, schema, type);
+      case BOOLEAN: return conversion.fromBoolean((Boolean) datum, schema, type);
+      }
+      return datum;
+    } catch (ClassCastException e) {
+      throw new AvroRuntimeException("Cannot convert " + datum + ":" +
+          datum.getClass().getSimpleName() + ": expected generic type", e);
+    }
+  }
+
   /** Called to read a record instance. May be overridden for alternate record
    * representations.*/
   protected Object readRecord(Object old, Schema expected, 
@@ -213,10 +260,20 @@ public class GenericDatumReader<D> imple
     long l = in.readArrayStart();
     long base = 0;
     if (l > 0) {
+      LogicalType logicalType = expectedType.getLogicalType();
+      Conversion<?> conversion = getData().getConversionFor(logicalType);
       Object array = newArray(old, (int) l, expected);
       do {
-        for (long i = 0; i < l; i++) {
-          addToArray(array, base + i, read(peekArray(array), expectedType, in));
+        if (logicalType != null && conversion != null) {
+          for (long i = 0; i < l; i++) {
+            addToArray(array, base + i, readWithConversion(
+                peekArray(array), expectedType, logicalType, conversion, in));
+          }
+        } else {
+          for (long i = 0; i < l; i++) {
+            addToArray(array, base + i, readWithoutConversion(
+                peekArray(array), expectedType, in));
+          }
         }
         base += l;
       } while ((l = in.arrayNext()) > 0);
@@ -249,11 +306,21 @@ public class GenericDatumReader<D> imple
       ResolvingDecoder in) throws IOException {
     Schema eValue = expected.getValueType();
     long l = in.readMapStart();
+    LogicalType logicalType = eValue.getLogicalType();
+    Conversion<?> conversion = getData().getConversionFor(logicalType);
     Object map = newMap(old, (int) l);
     if (l > 0) {
       do {
-        for (int i = 0; i < l; i++) {
-          addToMap(map, readMapKey(null, expected, in), read(null, eValue, in));
+        if (logicalType != null && conversion != null) {
+          for (int i = 0; i < l; i++) {
+            addToMap(map, readMapKey(null, expected, in),
+                readWithConversion(null, eValue, logicalType, conversion, in));
+          }
+        } else {
+          for (int i = 0; i < l; i++) {
+            addToMap(map, readMapKey(null, expected, in),
+                readWithoutConversion(null, eValue, in));
+          }
         }
       } while ((l = in.mapNext()) > 0);
     }

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java?rev=1681906&r1=1681905&r2=1681906&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java Wed May 27 04:48:58 2015
@@ -25,6 +25,8 @@ import java.util.Map;
 import java.util.Collection;
 
 import org.apache.avro.AvroTypeException;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.io.DatumWriter;
@@ -57,9 +59,46 @@ public class GenericDatumWriter<D> imple
   public void write(D datum, Encoder out) throws IOException {
     write(root, datum, out);
   }
-  
+
   /** Called to write data.*/
   protected void write(Schema schema, Object datum, Encoder out)
+      throws IOException {
+    LogicalType logicalType = schema.getLogicalType();
+    if (datum != null && logicalType != null) {
+      Conversion<?> conversion = getData()
+          .getConversionFrom(datum.getClass(), logicalType);
+      writeWithoutConversion(schema,
+          convert(schema, logicalType, conversion, datum), out);
+    } else {
+      writeWithoutConversion(schema, datum, out);
+    }
+  }
+
+  private <T> Object convert(Schema schema, LogicalType logicalType,
+                             Conversion<T> conversion, Object datum) {
+    if (conversion == null) {
+      return datum;
+    }
+    Class<T> fromClass = conversion.getConvertedType();
+    switch (schema.getType()) {
+    case RECORD:  return conversion.toRecord(fromClass.cast(datum), schema, logicalType);
+    case ENUM:    return conversion.toEnumSymbol(fromClass.cast(datum), schema, logicalType);
+    case ARRAY:   return conversion.toArray(fromClass.cast(datum), schema, logicalType);
+    case MAP:     return conversion.toMap(fromClass.cast(datum), schema, logicalType);
+    case FIXED:   return conversion.toFixed(fromClass.cast(datum), schema, logicalType);
+    case STRING:  return conversion.toCharSequence(fromClass.cast(datum), schema, logicalType);
+    case BYTES:   return conversion.toBytes(fromClass.cast(datum), schema, logicalType);
+    case INT:     return conversion.toInt(fromClass.cast(datum), schema, logicalType);
+    case LONG:    return conversion.toLong(fromClass.cast(datum), schema, logicalType);
+    case FLOAT:   return conversion.toFloat(fromClass.cast(datum), schema, logicalType);
+    case DOUBLE:  return conversion.toDouble(fromClass.cast(datum), schema, logicalType);
+    case BOOLEAN: return conversion.toBoolean(fromClass.cast(datum), schema, logicalType);
+    }
+    return datum;
+  }
+
+  /** Called to write data.*/
+  protected void writeWithoutConversion(Schema schema, Object datum, Encoder out)
     throws IOException {
     try {
       switch (schema.getType()) {

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java?rev=1681906&r1=1681905&r2=1681906&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java Wed May 27 04:48:58 2015
@@ -41,10 +41,13 @@ import java.util.concurrent.ConcurrentHa
 import org.apache.avro.AvroRemoteException;
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.AvroTypeException;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
 import org.apache.avro.Protocol;
 import org.apache.avro.Protocol.Message;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.BinaryData;
@@ -379,6 +382,12 @@ public class ReflectData extends Specifi
 
   @Override
   public Class getClass(Schema schema) {
+    // see if the element class will be converted and use that class
+    Conversion<?> conversion = getConversionFor(schema.getLogicalType());
+    if (conversion != null) {
+      return conversion.getConvertedType();
+    }
+
     switch (schema.getType()) {
     case ARRAY:
       Class collectionClass = getClassProp(schema, CLASS_PROP);
@@ -552,6 +561,11 @@ public class ReflectData extends Specifi
         return Schema.create(Schema.Type.BYTES);
       if (Collection.class.isAssignableFrom(c))              // array
         throw new AvroRuntimeException("Can't find element type of Collection");
+      for (Conversion<?> conversion : conversions.values()) {  // logical type
+        if (conversion.getConvertedType().isAssignableFrom(c)) {
+          return conversion.getRecommendedSchema();
+        }
+      }
       String fullName = c.getName();
       Schema schema = names.get(fullName);
       if (schema == null) {
@@ -859,5 +873,32 @@ public class ReflectData extends Specifi
       schema.addAlias(alias.alias(), space);
     }
   }
-  
+
+  @Override
+  public Object createFixed(Object old, Schema schema) {
+    // SpecificData will try to instantiate the type returned by getClass, but
+    // that is the converted class and can't be constructed.
+    LogicalType logicalType = schema.getLogicalType();
+    if (logicalType != null) {
+      Conversion<?> conversion = getConversionFor(schema.getLogicalType());
+      if (conversion != null) {
+        return new GenericData.Fixed(schema);
+      }
+    }
+    return super.createFixed(old, schema);
+  }
+
+  @Override
+  public Object newRecord(Object old, Schema schema) {
+    // SpecificData will try to instantiate the type returned by getClass, but
+    // that is the converted class and can't be constructed.
+    LogicalType logicalType = schema.getLogicalType();
+    if (logicalType != null) {
+      Conversion<?> conversion = getConversionFor(schema.getLogicalType());
+      if (conversion != null) {
+        return new GenericData.Record(schema);
+      }
+    }
+    return super.newRecord(old, schema);
+  }
 }

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java?rev=1681906&r1=1681905&r2=1681906&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java Wed May 27 04:48:58 2015
@@ -25,6 +25,8 @@ import java.util.Collection;
 import java.util.Map;
 
 import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.IndexedRecord;
@@ -75,6 +77,16 @@ public class ReflectDatumReader<T> exten
     Class<?> elementClass =
       ReflectData.getClassProp(schema, SpecificData.ELEMENT_PROP);
 
+    if (elementClass == null) {
+      // see if the element class will be converted and use that class
+      // logical types cannot conflict with java-element-class
+      Conversion<?> elementConversion = getData()
+          .getConversionFor(schema.getElementType().getLogicalType());
+      if (elementConversion != null) {
+        elementClass = elementConversion.getConvertedType();
+      }
+    }
+
     if (collectionClass == null && elementClass == null)
       return super.newArray(old, size, schema);   // use specific/generic
 
@@ -163,26 +175,52 @@ public class ReflectDatumReader<T> exten
 
   private Object readObjectArray(Object[] array, Schema expectedType, long l,
       ResolvingDecoder in) throws IOException {
+    LogicalType logicalType = expectedType.getLogicalType();
+    Conversion<?> conversion = getData().getConversionFor(logicalType);
     int index = 0;
-    do {
-      int limit = index + (int) l;
-      while (index < limit) {
-        Object element = read(null, expectedType, in);
-        array[index] = element;
-        index++;
-      }
-    } while ((l = in.arrayNext()) > 0);
+    if (logicalType != null && conversion != null) {
+      do {
+        int limit = index + (int) l;
+        while (index < limit) {
+          Object element = readWithConversion(
+              null, expectedType, logicalType, conversion, in);
+          array[index] = element;
+          index++;
+        }
+      } while ((l = in.arrayNext()) > 0);
+    } else {
+      do {
+        int limit = index + (int) l;
+        while (index < limit) {
+          Object element = readWithoutConversion(null, expectedType, in);
+          array[index] = element;
+          index++;
+        }
+      } while ((l = in.arrayNext()) > 0);
+    }
     return array;
   }
 
   private Object readCollection(Collection<Object> c, Schema expectedType,
       long l, ResolvingDecoder in) throws IOException {
-    do {
-      for (int i = 0; i < l; i++) {
-        Object element = read(null, expectedType, in);
-        c.add(element);
-      }
-    } while ((l = in.arrayNext()) > 0);
+    LogicalType logicalType = expectedType.getLogicalType();
+    Conversion<?> conversion = getData().getConversionFor(logicalType);
+    if (logicalType != null && conversion != null) {
+      do {
+        for (int i = 0; i < l; i++) {
+          Object element = readWithConversion(
+              null, expectedType, logicalType, conversion, in);
+          c.add(element);
+        }
+      } while ((l = in.arrayNext()) > 0);
+    } else {
+      do {
+        for (int i = 0; i < l; i++) {
+          Object element = readWithoutConversion(null, expectedType, in);
+          c.add(element);
+        }
+      } while ((l = in.arrayNext()) > 0);
+    }
     return c;
   }
 
@@ -245,6 +283,28 @@ public class ReflectDatumReader<T> exten
             throw new AvroRuntimeException("Failed to read Stringable", e);
           } 
         }
+        LogicalType logicalType = f.schema().getLogicalType();
+        if (logicalType != null) {
+          Conversion<?> conversion = getData().getConversionTo(
+              accessor.getField().getType(), logicalType);
+          if (conversion != null) {
+            try {
+              accessor.set(record, convert(
+                  readWithoutConversion(oldDatum, f.schema(), in),
+                  f.schema(), logicalType, conversion));
+            } catch (IllegalAccessException e) {
+              throw new AvroRuntimeException("Failed to set " + f);
+            }
+            return;
+          }
+        }
+        try {
+          accessor.set(record,
+              readWithoutConversion(oldDatum, f.schema(), in));
+          return;
+        } catch (IllegalAccessException e) {
+          throw new AvroRuntimeException("Failed to set " + f);
+        }
       }
     }
     super.readField(record, f, oldDatum, in, state);

Added: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestLogicalType.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestLogicalType.java?rev=1681906&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestLogicalType.java (added)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestLogicalType.java Wed May 27 04:48:58 2015
@@ -0,0 +1,279 @@
+package org.apache.avro;
+
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestLogicalType {
+
+  @Test
+  public void testDecimalFromSchema() {
+    Schema schema = Schema.createFixed("aFixed", null, null, 4);
+    schema.addProp("logicalType", "decimal");
+    schema.addProp("precision", 9);
+    schema.addProp("scale", 2);
+    LogicalType logicalType = LogicalTypes.fromSchemaIgnoreInvalid(schema);
+
+    Assert.assertTrue("Should be a Decimal",
+        logicalType instanceof LogicalTypes.Decimal);
+    LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+    Assert.assertEquals("Should have correct precision",
+        9, decimal.getPrecision());
+    Assert.assertEquals("Should have correct scale",
+        2, decimal.getScale());
+  }
+
+  @Test
+  public void testInvalidLogicalTypeIgnored() {
+    final Schema schema = Schema.createFixed("aFixed", null, null, 2);
+    schema.addProp("logicalType", "decimal");
+    schema.addProp("precision", 9);
+    schema.addProp("scale", 2);
+
+    Assert.assertNull("Should ignore invalid logical type",
+        LogicalTypes.fromSchemaIgnoreInvalid(schema));
+  }
+
+  @Test
+  public void testDecimalWithNonByteArrayTypes() {
+    final LogicalType decimal = LogicalTypes.decimal(5, 2);
+    // test simple types
+    Schema[] nonBytes = new Schema[] {
+        Schema.createRecord("Record", null, null, false),
+        Schema.createArray(Schema.create(Schema.Type.BYTES)),
+        Schema.createMap(Schema.create(Schema.Type.BYTES)),
+        Schema.createEnum("Enum", null, null, Arrays.asList("a", "b")),
+        Schema.createUnion(Arrays.asList(
+            Schema.create(Schema.Type.BYTES),
+            Schema.createFixed("fixed", null, null, 4))),
+        Schema.create(Schema.Type.BOOLEAN), Schema.create(Schema.Type.INT),
+        Schema.create(Schema.Type.LONG), Schema.create(Schema.Type.FLOAT),
+        Schema.create(Schema.Type.DOUBLE), Schema.create(Schema.Type.NULL),
+        Schema.create(Schema.Type.STRING) };
+    for (final Schema schema : nonBytes) {
+      assertThrows("Should reject type: " + schema.getType(),
+          IllegalArgumentException.class,
+          "Logical type decimal must be backed by fixed or bytes", new Callable() {
+            @Override
+            public Object call() throws Exception {
+              decimal.addToSchema(schema);
+              return null;
+            }
+          });
+    }
+  }
+
+  @Test
+  public void testUnknownFromJsonNode() {
+    Schema schema = Schema.create(Schema.Type.STRING);
+    schema.addProp("logicalType", "unknown");
+    schema.addProp("someProperty", 34);
+    LogicalType logicalType = LogicalTypes.fromSchemaIgnoreInvalid(schema);
+    Assert.assertNull("Should not return a LogicalType instance", logicalType);
+  }
+
+  @Test
+  public void testDecimalBytesHasNoPrecisionLimit() {
+    Schema schema = Schema.create(Schema.Type.BYTES);
+    // precision is not limited for bytes
+    LogicalTypes.decimal(Integer.MAX_VALUE).addToSchema(schema);
+    Assert.assertEquals("Precision should be an Integer.MAX_VALUE",
+        Integer.MAX_VALUE,
+        ((LogicalTypes.Decimal) LogicalTypes.fromSchemaIgnoreInvalid(schema)).getPrecision());
+  }
+
+  @Test
+  public void testDecimalFixedPrecisionLimit() {
+    // 4 bytes can hold up to 9 digits of precision
+    final Schema schema = Schema.createFixed("aDecimal", null, null, 4);
+    assertThrows("Should reject precision", IllegalArgumentException.class,
+        "fixed(4) cannot store 10 digits (max 9)", new Callable() {
+          @Override
+          public Object call() throws Exception {
+            LogicalTypes.decimal(10).addToSchema(schema);
+            return null;
+          }
+        }
+    );
+    Assert.assertNull("Invalid logical type should not be set on schema",
+        LogicalTypes.fromSchemaIgnoreInvalid(schema));
+  }
+
+  @Test
+  public void testDecimalFailsWithZeroPrecision() {
+    final Schema schema = Schema.createFixed("aDecimal", null, null, 4);
+    assertThrows("Should reject precision", IllegalArgumentException.class,
+        "Invalid decimal precision: 0 (must be positive)", new Callable() {
+          @Override
+          public Object call() throws Exception {
+            LogicalTypes.decimal(0).addToSchema(schema);
+            return null;
+          }
+        });
+    Assert.assertNull("Invalid logical type should not be set on schema",
+        LogicalTypes.fromSchemaIgnoreInvalid(schema));
+  }
+
+  @Test
+  public void testDecimalFailsWithNegativePrecision() {
+    final Schema schema = Schema.createFixed("aDecimal", null, null, 4);
+    assertThrows("Should reject precision", IllegalArgumentException.class,
+        "Invalid decimal precision: -9 (must be positive)", new Callable() {
+          @Override
+          public Object call() throws Exception {
+            LogicalTypes.decimal(-9).addToSchema(schema);
+            return null;
+          }
+        });
+    Assert.assertNull("Invalid logical type should not be set on schema",
+        LogicalTypes.fromSchemaIgnoreInvalid(schema));
+  }
+
+  @Test
+  public void testDecimalScaleBoundedByPrecision() {
+    final Schema schema = Schema.createFixed("aDecimal", null, null, 4);
+    assertThrows("Should reject precision", IllegalArgumentException.class,
+        "Invalid decimal scale: 10 (greater than precision: 9)",
+        new Callable() {
+          @Override
+          public Object call() throws Exception {
+            LogicalTypes.decimal(9, 10).addToSchema(schema);
+            return null;
+          }
+        });
+    Assert.assertNull("Invalid logical type should not be set on schema",
+        LogicalTypes.fromSchemaIgnoreInvalid(schema));
+  }
+
+  @Test
+  public void testDecimalFailsWithNegativeScale() {
+    final Schema schema = Schema.createFixed("aDecimal", null, null, 4);
+    assertThrows("Should reject precision", IllegalArgumentException.class,
+        "Invalid decimal scale: -2 (must be positive)", new Callable() {
+          @Override
+          public Object call() throws Exception {
+            LogicalTypes.decimal(9, -2).addToSchema(schema);
+            return null;
+          }
+        });
+    Assert.assertNull("Invalid logical type should not be set on schema",
+        LogicalTypes.fromSchemaIgnoreInvalid(schema));
+  }
+
+  @Test
+  public void testSchemaRejectsSecondLogicalType() {
+    final Schema schema = Schema.createFixed("aDecimal", null, null, 4);
+    LogicalTypes.decimal(9).addToSchema(schema);
+    assertThrows("Should reject second logical type",
+        AvroRuntimeException.class,
+        "Can't overwrite property: scale", new Callable() {
+          @Override
+          public Object call() throws Exception {
+            LogicalTypes.decimal(9, 2).addToSchema(schema);
+            return null;
+          }
+        }
+    );
+    Assert.assertEquals("First logical type should still be set on schema",
+        LogicalTypes.decimal(9), LogicalTypes.fromSchemaIgnoreInvalid(schema));
+  }
+
+  @Test
+  public void testDecimalDefaultScale() {
+    Schema schema = Schema.createFixed("aDecimal", null, null, 4);
+    // 4 bytes can hold up to 9 digits of precision
+    LogicalTypes.decimal(9).addToSchema(schema);
+    Assert.assertEquals("Scale should be a 0",
+        0,
+        ((LogicalTypes.Decimal) LogicalTypes.fromSchemaIgnoreInvalid(schema)).getScale());
+  }
+
+  @Test
+  public void testFixedDecimalToFromJson() {
+    Schema schema = Schema.createFixed("aDecimal", null, null, 4);
+    LogicalTypes.decimal(9, 2).addToSchema(schema);
+    Schema parsed = new Schema.Parser().parse(schema.toString(true));
+    Assert.assertEquals("Constructed and parsed schemas should match",
+        schema, parsed);
+  }
+
+  @Test
+  public void testBytesDecimalToFromJson() {
+    Schema schema = Schema.create(Schema.Type.BYTES);
+    LogicalTypes.decimal(9, 2).addToSchema(schema);
+    Schema parsed = new Schema.Parser().parse(schema.toString(true));
+    Assert.assertEquals("Constructed and parsed schemas should match",
+        schema, parsed);
+  }
+
+  @Test
+  public void testLogicalTypeEquals() {
+    LogicalTypes.Decimal decimal90 = LogicalTypes.decimal(9);
+    LogicalTypes.Decimal decimal80 = LogicalTypes.decimal(8);
+    LogicalTypes.Decimal decimal92 = LogicalTypes.decimal(9, 2);
+
+    assertEqualsTrue("Same decimal", LogicalTypes.decimal(9, 0), decimal90);
+    assertEqualsTrue("Same decimal", LogicalTypes.decimal(8, 0), decimal80);
+    assertEqualsTrue("Same decimal", LogicalTypes.decimal(9, 2), decimal92);
+    assertEqualsFalse("Different logical type", LogicalTypes.uuid(), decimal90);
+    assertEqualsFalse("Different precision", decimal90, decimal80);
+    assertEqualsFalse("Different scale", decimal90, decimal92);
+  }
+
+  @Test
+  public void testLogicalTypeInSchemaEquals() {
+    Schema schema1 = Schema.createFixed("aDecimal", null, null, 4);
+    Schema schema2 = Schema.createFixed("aDecimal", null, null, 4);
+    Schema schema3 = Schema.createFixed("aDecimal", null, null, 4);
+    Assert.assertNotSame(schema1, schema2);
+    Assert.assertNotSame(schema1, schema3);
+    assertEqualsTrue("No logical types", schema1, schema2);
+    assertEqualsTrue("No logical types", schema1, schema3);
+
+    LogicalTypes.decimal(9).addToSchema(schema1);
+    assertEqualsFalse("Two has no logical type", schema1, schema2);
+
+    LogicalTypes.decimal(9).addToSchema(schema2);
+    assertEqualsTrue("Same logical types", schema1, schema2);
+
+    LogicalTypes.decimal(9, 2).addToSchema(schema3);
+    assertEqualsFalse("Different logical type", schema1, schema3);
+  }
+
+  public static void assertEqualsTrue(String message, Object o1, Object o2) {
+    Assert.assertTrue("Should be equal (forward): " + message, o1.equals(o2));
+    Assert.assertTrue("Should be equal (reverse): " + message, o2.equals(o1));
+  }
+
+  public static void assertEqualsFalse(String message, Object o1, Object o2) {
+    Assert.assertFalse("Should be equal (forward): " + message, o1.equals(o2));
+    Assert.assertFalse("Should be equal (reverse): " + message, o2.equals(o1));
+  }
+
+  /**
+   * A convenience method to avoid a large number of @Test(expected=...) tests
+   * @param message A String message to describe this assertion
+   * @param expected An Exception class that the Runnable should throw
+   * @param containedInMessage A String that should be contained by the thrown
+   *                           exception's message
+   * @param callable A Callable that is expected to throw the exception
+   */
+  public static void assertThrows(String message,
+                                  Class<? extends Exception> expected,
+                                  String containedInMessage,
+                                  Callable callable) {
+    try {
+      callable.call();
+      Assert.fail("No exception was thrown (" + message + "), expected: " +
+          expected.getName());
+    } catch (Exception actual) {
+      Assert.assertEquals(message, expected, actual.getClass());
+      Assert.assertTrue(
+          "Expected exception message (" + containedInMessage + ") missing: " +
+              actual.getMessage(),
+          actual.getMessage().contains(containedInMessage)
+      );
+    }
+  }
+}

Added: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java?rev=1681906&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java (added)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java Wed May 27 04:48:58 2015
@@ -0,0 +1,217 @@
+package org.apache.avro.generic;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import org.apache.avro.Conversion;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestGenericLogicalTypes {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public static final GenericData GENERIC = new GenericData();
+
+  @BeforeClass
+  public static void addDecimalAndUUID() {
+    GENERIC.addLogicalTypeConversion(new Conversions.DecimalConversion());
+    GENERIC.addLogicalTypeConversion(new Conversions.UUIDConversion());
+  }
+
+  @Test
+  public void testReadUUID() throws IOException {
+    Schema uuidSchema = Schema.create(Schema.Type.STRING);
+    LogicalTypes.uuid().addToSchema(uuidSchema);
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+    List<UUID> expected = Arrays.asList(u1, u2);
+
+    File test = write(Schema.create(Schema.Type.STRING),
+        u1.toString(), u2.toString());
+    Assert.assertEquals("Should convert Strings to UUIDs",
+        expected, read(GENERIC.createDatumReader(uuidSchema), test));
+  }
+
+  @Test
+  public void testWriteUUID() throws IOException {
+    Schema stringSchema = Schema.create(Schema.Type.STRING);
+    stringSchema.addProp(GenericData.STRING_PROP, "String");
+    Schema uuidSchema = Schema.create(Schema.Type.STRING);
+    LogicalTypes.uuid().addToSchema(uuidSchema);
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+    List<String> expected = Arrays.asList(u1.toString(), u2.toString());
+
+    File test = write(GENERIC, uuidSchema, u1, u2);
+    Assert.assertEquals("Should read UUIDs as Strings",
+        expected, read(GenericData.get().createDatumReader(stringSchema), test));
+  }
+
+  @Test
+  public void testWriteNullableUUID() throws IOException {
+    Schema stringSchema = Schema.create(Schema.Type.STRING);
+    stringSchema.addProp(GenericData.STRING_PROP, "String");
+    Schema nullableStringSchema = Schema.createUnion(
+        Schema.create(Schema.Type.NULL), stringSchema);
+
+    Schema uuidSchema = Schema.create(Schema.Type.STRING);
+    LogicalTypes.uuid().addToSchema(uuidSchema);
+    Schema nullableUuidSchema = Schema.createUnion(
+        Schema.create(Schema.Type.NULL), uuidSchema);
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+    List<String> expected = Arrays.asList(u1.toString(), u2.toString());
+
+    File test = write(GENERIC, nullableUuidSchema, u1, u2);
+    Assert.assertEquals("Should read UUIDs as Strings",
+        expected,
+        read(GenericData.get().createDatumReader(nullableStringSchema), test));
+  }
+
+  @Test
+  public void testReadDecimalFixed() throws IOException {
+    LogicalType decimal = LogicalTypes.decimal(9, 2);
+    Schema fixedSchema = Schema.createFixed("aFixed", null, null, 4);
+    Schema decimalSchema = decimal.addToSchema(
+        Schema.createFixed("aFixed", null, null, 4));
+
+    BigDecimal d1 = new BigDecimal("-34.34");
+    BigDecimal d2 = new BigDecimal("117230.00");
+    List<BigDecimal> expected = Arrays.asList(d1, d2);
+
+    Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+    // use the conversion directly instead of relying on the write side
+    GenericFixed d1fixed = conversion.toFixed(d1, fixedSchema, decimal);
+    GenericFixed d2fixed = conversion.toFixed(d2, fixedSchema, decimal);
+
+    File test = write(fixedSchema, d1fixed, d2fixed);
+    Assert.assertEquals("Should convert fixed to BigDecimals",
+        expected, read(GENERIC.createDatumReader(decimalSchema), test));
+  }
+
+  @Test
+  public void testWriteDecimalFixed() throws IOException {
+    LogicalType decimal = LogicalTypes.decimal(9, 2);
+    Schema fixedSchema = Schema.createFixed("aFixed", null, null, 4);
+    Schema decimalSchema = decimal.addToSchema(
+        Schema.createFixed("aFixed", null, null, 4));
+
+    BigDecimal d1 = new BigDecimal("-34.34");
+    BigDecimal d2 = new BigDecimal("117230.00");
+
+    Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+    GenericFixed d1fixed = conversion.toFixed(d1, fixedSchema, decimal);
+    GenericFixed d2fixed = conversion.toFixed(d2, fixedSchema, decimal);
+    List<GenericFixed> expected = Arrays.asList(d1fixed, d2fixed);
+
+    File test = write(GENERIC, decimalSchema, d1, d2);
+    Assert.assertEquals("Should read BigDecimals as fixed",
+        expected, read(GenericData.get().createDatumReader(fixedSchema), test));
+  }
+
+  @Test
+  public void testReadDecimalBytes() throws IOException {
+    LogicalType decimal = LogicalTypes.decimal(9, 2);
+    Schema bytesSchema = Schema.create(Schema.Type.BYTES);
+    Schema decimalSchema = decimal.addToSchema(Schema.create(Schema.Type.BYTES));
+
+    BigDecimal d1 = new BigDecimal("-34.34");
+    BigDecimal d2 = new BigDecimal("117230.00");
+    List<BigDecimal> expected = Arrays.asList(d1, d2);
+
+    Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+    // use the conversion directly instead of relying on the write side
+    ByteBuffer d1bytes = conversion.toBytes(d1, bytesSchema, decimal);
+    ByteBuffer d2bytes = conversion.toBytes(d2, bytesSchema, decimal);
+
+    File test = write(bytesSchema, d1bytes, d2bytes);
+    Assert.assertEquals("Should convert bytes to BigDecimals",
+        expected, read(GENERIC.createDatumReader(decimalSchema), test));
+  }
+
+  @Test
+  public void testWriteDecimalBytes() throws IOException {
+    LogicalType decimal = LogicalTypes.decimal(9, 2);
+    Schema bytesSchema = Schema.create(Schema.Type.BYTES);
+    Schema decimalSchema = decimal.addToSchema(Schema.create(Schema.Type.BYTES));
+
+    BigDecimal d1 = new BigDecimal("-34.34");
+    BigDecimal d2 = new BigDecimal("117230.00");
+
+    Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+    // use the conversion directly instead of relying on the write side
+    ByteBuffer d1bytes = conversion.toBytes(d1, bytesSchema, decimal);
+    ByteBuffer d2bytes = conversion.toBytes(d2, bytesSchema, decimal);
+    List<ByteBuffer> expected = Arrays.asList(d1bytes, d2bytes);
+
+    File test = write(GENERIC, decimalSchema, d1bytes, d2bytes);
+    Assert.assertEquals("Should read BigDecimals as bytes",
+        expected, read(GenericData.get().createDatumReader(bytesSchema), test));
+  }
+
+  private <D> List<D> read(DatumReader<D> reader, File file) throws IOException {
+    List<D> data = new ArrayList<D>();
+    FileReader<D> fileReader = null;
+
+    try {
+      fileReader = new DataFileReader<D>(file, reader);
+      for (D datum : fileReader) {
+        data.add(datum);
+      }
+    } finally {
+      if (fileReader != null) {
+        fileReader.close();
+      }
+    }
+
+    return data;
+  }
+
+  private <D> File write(Schema schema, D... data) throws IOException {
+    return write(GenericData.get(), schema, data);
+  }
+
+  @SuppressWarnings("unchecked")
+  private <D> File write(GenericData model, Schema schema, D... data) throws IOException {
+    File file = temp.newFile();
+    DatumWriter<D> writer = model.createDatumWriter(schema);
+    DataFileWriter<D> fileWriter = new DataFileWriter<D>(writer);
+
+    try {
+      fileWriter.create(schema, file);
+      for (D datum : data) {
+        fileWriter.append(datum);
+      }
+    } finally {
+      fileWriter.close();
+    }
+
+    return file;
+  }
+}



Mime
View raw message