avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1557225 [2/3] - in /avro/trunk: ./ lang/py3/ lang/py3/avro/ lang/py3/avro/tests/ lang/py3/scripts/ share/test/schemas/
Date Fri, 10 Jan 2014 19:11:43 GMT
Added: avro/trunk/lang/py3/avro/protocol.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/protocol.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/protocol.py (added)
+++ avro/trunk/lang/py3/avro/protocol.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,402 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Protocol implementation.
+"""
+
+
+import hashlib
+import json
+import logging
+
+from avro import schema
+
+ImmutableDict = schema.ImmutableDict
+
+# ------------------------------------------------------------------------------
+# Constants
+
+# Allowed top-level schemas in a protocol:
+VALID_TYPE_SCHEMA_TYPES = frozenset(['enum', 'record', 'error', 'fixed'])
+
+
+# ------------------------------------------------------------------------------
+# Exceptions
+
+
+class ProtocolParseException(schema.AvroException):
+  """Error while parsing a JSON protocol descriptor."""
+  pass
+
+
+# ------------------------------------------------------------------------------
+# Base Classes
+
+
+class Protocol(object):
+  """An application protocol."""
+
+  @staticmethod
+  def _ParseTypeDesc(type_desc, names):
+    type_schema = schema.SchemaFromJSONData(type_desc, names=names)
+    if type_schema.type not in VALID_TYPE_SCHEMA_TYPES:
+      raise ProtocolParseException(
+          'Invalid type %r in protocol %r: '
+          'protocols can only declare types %s.'
+          % (type_schema, avro_name, ','.join(VALID_TYPE_SCHEMA_TYPES)))
+    return type_schema
+
+  @staticmethod
+  def _ParseMessageDesc(name, message_desc, names):
+    """Parses a protocol message descriptor.
+
+    Args:
+      name: Name of the message.
+      message_desc: Descriptor of the message.
+      names: Tracker of the named Avro schema.
+    Returns:
+      The parsed protocol message.
+    Raises:
+      ProtocolParseException: if the descriptor is invalid.
+    """
+    request_desc = message_desc.get('request')
+    if request_desc is None:
+      raise ProtocolParseException(
+          'Invalid message descriptor with no "request": %r.' % message_desc)
+    request_schema = Message._ParseRequestFromJSONDesc(
+        request_desc=request_desc,
+        names=names,
+    )
+
+    response_desc = message_desc.get('response')
+    if response_desc is None:
+      raise ProtocolParseException(
+          'Invalid message descriptor with no "response": %r.' % message_desc)
+    response_schema = Message._ParseResponseFromJSONDesc(
+        response_desc=response_desc,
+        names=names,
+    )
+
+    # Errors are optional:
+    errors_desc = message_desc.get('errors', tuple())
+    error_union_schema = Message._ParseErrorsFromJSONDesc(
+        errors_desc=errors_desc,
+        names=names,
+    )
+
+    return Message(
+        name=name,
+        request=request_schema,
+        response=response_schema,
+        errors=error_union_schema,
+    )
+
+  @staticmethod
+  def _ParseMessageDescMap(message_desc_map, names):
+    for name, message_desc in message_desc_map.items():
+      yield Protocol._ParseMessageDesc(
+          name=name,
+          message_desc=message_desc,
+          names=names,
+      )
+
+  def __init__(
+      self,
+      name,
+      namespace=None,
+      types=tuple(),
+      messages=tuple(),
+  ):
+    """Initializes a new protocol object.
+
+    Args:
+      name: Protocol name (absolute or relative).
+      namespace: Optional explicit namespace (if name is relative).
+      types: Collection of types in the protocol.
+      messages: Collection of messages in the protocol.
+    """
+    self._avro_name = schema.Name(name=name, namespace=namespace)
+    self._fullname = self._avro_name.fullname
+    self._name = self._avro_name.simple_name
+    self._namespace = self._avro_name.namespace
+
+    self._props = {}
+    self._props['name'] = self._name
+    if self._namespace:
+      self._props['namespace'] = self._namespace
+
+    self._names = schema.Names(default_namespace=self._namespace)
+
+    self._types = tuple(types)
+    # Map: type full name -> type schema
+    self._type_map = (
+        ImmutableDict((type.fullname, type) for type in self._types))
+    # This assertion cannot fail unless we don't track named schemas properly:
+    assert (len(self._types) == len(self._type_map)), (
+        'Type list %r does not match type map: %r'
+        % (self._types, self._type_map))
+    # TODO: set props['types']
+
+    self._messages = tuple(messages)
+
+    # Map: message name -> Message
+    # Note that message names are simple names unique within the protocol.
+    self._message_map = ImmutableDict(
+        items=((message.name, message) for message in self._messages))
+    if len(self._messages) != len(self._message_map):
+      raise ProtocolParseException(
+          'Invalid protocol %s with duplicate message name: %r'
+          % (self._avro_name, self._messages))
+    # TODO: set props['messages']
+
+    self._md5 = hashlib.md5(str(self).encode('utf-8')).digest()
+
+  @property
+  def name(self):
+    """Returns: the simple name of the protocol."""
+    return self._name
+
+  @property
+  def namespace(self):
+    """Returns: the namespace this protocol belongs to."""
+    return self._namespace
+
+  @property
+  def fullname(self):
+    """Returns: the fully qualified name of this protocol."""
+    return self._fullname
+
+  @property
+  def types(self):
+    """Returns: the collection of types declared in this protocol."""
+    return self._types
+
+  @property
+  def type_map(self):
+    """Returns: the map of types in this protocol, indexed by their full name."""
+    return self._type_map
+
+  @property
+  def messages(self):
+    """Returns: the collection of messages declared in this protocol."""
+    return self._messages
+
+  @property
+  def message_map(self):
+    """Returns: the map of messages in this protocol, indexed by their name."""
+    return self._message_map
+
+  @property
+  def md5(self):
+    return self._md5
+
+  @property
+  def props(self):
+    return self._props
+
+  def to_json(self):
+    to_dump = {}
+    to_dump['protocol'] = self.name
+    names = schema.Names(default_namespace=self.namespace)
+    if self.namespace:
+      to_dump['namespace'] = self.namespace
+    if self.types:
+      to_dump['types'] = [ t.to_json(names) for t in self.types ]
+    if self.messages:
+      messages_dict = {}
+      for name, body in self.message_map.items():
+        messages_dict[name] = body.to_json(names)
+      to_dump['messages'] = messages_dict
+    return to_dump
+
+  def __str__(self):
+    return json.dumps(self.to_json())
+
+  def __eq__(self, that):
+    to_cmp = json.loads(str(self))
+    return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+
+
+class Message(object):
+  """A Protocol message."""
+
+  @staticmethod
+  def _ParseRequestFromJSONDesc(request_desc, names):
+    """Parses the request descriptor of a protocol message.
+
+    Args:
+      request_desc: Descriptor of the message request.
+          This is a list of fields that defines an unnamed record.
+      names: Tracker for named Avro schemas.
+    Returns:
+      The parsed request schema, as an unnamed record.
+    """
+    fields = schema.RecordSchema._MakeFieldList(request_desc, names=names)
+    return schema.RecordSchema(
+        name=None,
+        namespace=None,
+        fields=fields,
+        names=names,
+        record_type=schema.REQUEST,
+    )
+
+  @staticmethod
+  def _ParseResponseFromJSONDesc(response_desc, names):
+    """Parses the response descriptor of a protocol message.
+
+    Args:
+      response_desc: Descriptor of the message response.
+          This is an arbitrary Avro schema descriptor.
+    Returns:
+      The parsed response schema.
+    """
+    return schema.SchemaFromJSONData(response_desc, names=names)
+
+  @staticmethod
+  def _ParseErrorsFromJSONDesc(errors_desc, names):
+    """Parses the errors descriptor of a protocol message.
+
+    Args:
+      errors_desc: Descriptor of the errors thrown by the protocol message.
+          This is a list of error types understood as an implicit union.
+          Each error type is an arbitrary Avro schema.
+      names: Tracker for named Avro schemas.
+    Returns:
+      The parsed ErrorUnionSchema.
+    """
+    error_union_desc = {
+        'type': schema.ERROR_UNION,
+        'declared_errors': errors_desc,
+    }
+    return schema.SchemaFromJSONData(error_union_desc, names=names)
+
+  def __init__(self,  name, request, response, errors=None):
+    self._name = name
+
+    self._props = {}
+    # TODO: set properties
+    self._request = request
+    self._response = response
+    self._errors = errors
+
+  @property
+  def name(self):
+    return self._name
+
+  @property
+  def request(self):
+    return self._request
+
+  @property
+  def response(self):
+    return self._response
+
+  @property
+  def errors(self):
+    return self._errors
+
+  def props(self):
+    return self._props
+
+  def __str__(self):
+    return json.dumps(self.to_json())
+
+  def to_json(self, names=None):
+    if names is None:
+      names = schema.Names()
+    to_dump = {}
+    to_dump['request'] = self.request.to_json(names)
+    to_dump['response'] = self.response.to_json(names)
+    if self.errors:
+      to_dump['errors'] = self.errors.to_json(names)
+    return to_dump
+
+  def __eq__(self, that):
+    return self.name == that.name and self.props == that.props
+
+
+# ------------------------------------------------------------------------------
+
+
+def ProtocolFromJSONData(json_data):
+  """Builds an Avro  Protocol from its JSON descriptor.
+
+  Args:
+    json_data: JSON data representing the descriptor of the Avro protocol.
+  Returns:
+    The Avro Protocol parsed from the JSON descriptor.
+  Raises:
+    ProtocolParseException: if the descriptor is invalid.
+  """
+  if type(json_data) != dict:
+    raise ProtocolParseException(
+        'Invalid JSON descriptor for an Avro protocol: %r' % json_data)
+
+  name = json_data.get('protocol')
+  if name is None:
+    raise ProtocolParseException(
+        'Invalid protocol descriptor with no "name": %r' % json_data)
+
+  # Namespace is optional
+  namespace = json_data.get('namespace')
+
+  avro_name = schema.Name(name=name, namespace=namespace)
+  names = schema.Names(default_namespace=avro_name.namespace)
+
+  type_desc_list = json_data.get('types', tuple())
+  types = tuple(map(
+      lambda desc: Protocol._ParseTypeDesc(desc, names=names),
+      type_desc_list))
+
+  message_desc_map = json_data.get('messages', dict())
+  messages = tuple(Protocol._ParseMessageDescMap(message_desc_map, names=names))
+
+  return Protocol(
+      name=name,
+      namespace=namespace,
+      types=types,
+      messages=messages,
+  )
+
+
+def Parse(json_string):
+  """Constructs a Protocol from its JSON descriptor in text form.
+
+  Args:
+    json_string: String representation of the JSON descriptor of the protocol.
+  Returns:
+    The parsed protocol.
+  Raises:
+    ProtocolParseException: on JSON parsing error,
+        or if the JSON descriptor is invalid.
+  """
+  try:
+    json_data = json.loads(json_string)
+  except Exception as exn:
+    raise ProtocolParseException(
+        'Error parsing protocol from JSON: %r. '
+        'Error message: %r.'
+        % (json_string, exn))
+
+  return ProtocolFromJSONData(json_data)
+

Propchange: avro/trunk/lang/py3/avro/protocol.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/schema.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/schema.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/schema.py (added)
+++ avro/trunk/lang/py3/avro/schema.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,1283 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Representation of Avro schemas.
+
+A schema may be one of:
+ - A record, mapping field names to field value data;
+ - An error, equivalent to a record;
+ - An enum, containing one of a small set of symbols;
+ - An array of values, all of the same schema;
+ - A map containing string/value pairs, each of a declared schema;
+ - A union of other schemas;
+ - A fixed sized binary object;
+ - A unicode string;
+ - A sequence of bytes;
+ - A 32-bit signed int;
+ - A 64-bit signed long;
+ - A 32-bit floating-point float;
+ - A 64-bit floating-point double;
+ - A boolean;
+ - Null.
+"""
+
+
+import abc
+import collections
+import json
+import logging
+import re
+
+
+# ------------------------------------------------------------------------------
+# Constants
+
+# Log level more verbose than DEBUG=10, INFO=20, etc.
+DEBUG_VERBOSE=5
+
+
+NULL    = 'null'
+BOOLEAN = 'boolean'
+STRING  = 'string'
+BYTES   = 'bytes'
+INT     = 'int'
+LONG    = 'long'
+FLOAT   = 'float'
+DOUBLE  = 'double'
+FIXED   = 'fixed'
+ENUM    = 'enum'
+RECORD  = 'record'
+ERROR   = 'error'
+ARRAY   = 'array'
+MAP     = 'map'
+UNION   = 'union'
+
+# Request and error unions are part of Avro protocols:
+REQUEST = 'request'
+ERROR_UNION = 'error_union'
+
+PRIMITIVE_TYPES = frozenset([
+  NULL,
+  BOOLEAN,
+  STRING,
+  BYTES,
+  INT,
+  LONG,
+  FLOAT,
+  DOUBLE,
+])
+
+NAMED_TYPES = frozenset([
+  FIXED,
+  ENUM,
+  RECORD,
+  ERROR,
+])
+
+VALID_TYPES = frozenset.union(
+  PRIMITIVE_TYPES,
+  NAMED_TYPES,
+  [
+    ARRAY,
+    MAP,
+    UNION,
+    REQUEST,
+    ERROR_UNION,
+  ],
+)
+
+SCHEMA_RESERVED_PROPS = frozenset([
+  'type',
+  'name',
+  'namespace',
+  'fields',     # Record
+  'items',      # Array
+  'size',       # Fixed
+  'symbols',    # Enum
+  'values',     # Map
+  'doc',
+])
+
+FIELD_RESERVED_PROPS = frozenset([
+  'default',
+  'name',
+  'doc',
+  'order',
+  'type',
+])
+
+VALID_FIELD_SORT_ORDERS = frozenset([
+  'ascending',
+  'descending',
+  'ignore',
+])
+
+
+# ------------------------------------------------------------------------------
+# Exceptions
+
+
+class Error(Exception):
+  """Base class for errors in this module."""
+  pass
+
+
+class AvroException(Error):
+  """Generic Avro schema error."""
+  pass
+
+
+class SchemaParseException(AvroException):
+  """Error while parsing a JSON schema descriptor."""
+  pass
+
+
+# ------------------------------------------------------------------------------
+
+
+class ImmutableDict(dict):
+  """Dictionary guaranteed immutable.
+
+  All mutations raise an exception.
+  Behaves exactly as a dict otherwise.
+  """
+
+  def __init__(self, items=None, **kwargs):
+    if items is not None:
+      super(ImmutableDict, self).__init__(items)
+      assert (len(kwargs) == 0)
+    else:
+      super(ImmutableDict, self).__init__(**kwargs)
+
+  def __setitem__(self, key, value):
+    raise Exception(
+        'Attempting to map key %r to value %r in ImmutableDict %r'
+        % (key, value, self))
+
+  def __delitem__(self, key):
+    raise Exception(
+        'Attempting to remove mapping for key %r in ImmutableDict %r'
+        % (key, self))
+
+  def clear(self):
+    raise Exception('Attempting to clear ImmutableDict %r' % self)
+
+  def update(self, items=None, **kwargs):
+    raise Exception(
+        'Attempting to update ImmutableDict %r with items=%r, kwargs=%r'
+        % (self, args, kwargs))
+
+  def pop(self, key, default=None):
+    raise Exception(
+        'Attempting to pop key %r from ImmutableDict %r' % (key, self))
+
+  def popitem(self):
+    raise Exception('Attempting to pop item from ImmutableDict %r' % self)
+
+
+# ------------------------------------------------------------------------------
+
+
+class Schema(object, metaclass=abc.ABCMeta):
+  """Abstract base class for all Schema classes."""
+
+  def __init__(self, type, other_props=None):
+    """Initializes a new schema object.
+
+    Args:
+      type: Type of the schema to initialize.
+      other_props: Optional dictionary of additional properties.
+    """
+    if type not in VALID_TYPES:
+      raise SchemaParseException('%r is not a valid Avro type.' % type)
+
+    # All properties of this schema, as a map: property name -> property value
+    self._props = {}
+
+    self._props['type'] = type
+    self._type = type
+
+    if other_props:
+      self._props.update(other_props)
+
+  @property
+  def name(self):
+    """Returns: the simple name of this schema."""
+    return self._props['name']
+
+  @property
+  def fullname(self):
+    """Returns: the fully qualified name of this schema."""
+    # By default, the full name is the simple name.
+    # Named schemas override this behavior to include the namespace.
+    return self.name
+
+  @property
+  def namespace(self):
+    """Returns: the namespace this schema belongs to, if any, or None."""
+    return self._props.get('namespace', None)
+
+  @property
+  def type(self):
+    """Returns: the type of this schema."""
+    return self._type
+
+  @property
+  def doc(self):
+    """Returns: the documentation associated to this schema, if any, or None."""
+    return self._props.get('doc', None)
+
+  @property
+  def props(self):
+    """Reports all the properties of this schema.
+
+    Includes all properties, reserved and non reserved.
+    JSON properties of this schema are directly generated from this dict.
+
+    Returns:
+      A read-only dictionary of properties associated to this schema.
+    """
+    return ImmutableDict(self._props)
+
+  @property
+  def other_props(self):
+    """Returns: the dictionary of non-reserved properties."""
+    return dict(FilterKeysOut(items=self._props, keys=SCHEMA_RESERVED_PROPS))
+
+  def __str__(self):
+    """Returns: the JSON representation of this schema."""
+    return json.dumps(self.to_json())
+
+  @abc.abstractmethod
+  def to_json(self, names):
+    """Converts the schema object into its AVRO specification representation.
+
+    Schema types that have names (records, enums, and fixed) must
+    be aware of not re-defining schemas that are already listed
+    in the parameter names.
+    """
+    raise Exception('Cannot run abstract method.')
+
+
+# ------------------------------------------------------------------------------
+
+
+_RE_NAME = re.compile(r'[A-Za-z_][A-Za-z0-9_]*')
+
+_RE_FULL_NAME = re.compile(
+    r'^'
+    r'[.]?(?:[A-Za-z_][A-Za-z0-9_]*[.])*'  # optional namespace
+    r'([A-Za-z_][A-Za-z0-9_]*)'            # name
+    r'$'
+)
+
+class Name(object):
+  """Representation of an Avro name."""
+
+  def __init__(self, name, namespace=None):
+    """Parses an Avro name.
+
+    Args:
+      name: Avro name to parse (relative or absolute).
+      namespace: Optional explicit namespace if the name is relative.
+    """
+    # Normalize: namespace is always defined as a string, possibly empty.
+    if namespace is None: namespace = ''
+
+    if '.' in name:
+      # name is absolute, namespace is ignored:
+      self._fullname = name
+
+      match = _RE_FULL_NAME.match(self._fullname)
+      if match is None:
+        raise SchemaParseException(
+            'Invalid absolute schema name: %r.' % self._fullname)
+
+      self._name = match.group(1)
+      self._namespace = self._fullname[:-(len(self._name) + 1)]
+
+    else:
+      # name is relative, combine with explicit namespace:
+      self._name = name
+      self._namespace = namespace
+      self._fullname = '%s.%s' % (self._namespace, self._name)
+
+      # Validate the fullname:
+      if _RE_FULL_NAME.match(self._fullname) is None:
+        raise SchemaParseException(
+            'Invalid schema name %r infered from name %r and namespace %r.'
+            % (self._fullname, self._name, self._namespace))
+
+  def __eq__(self, other):
+    if not isinstance(other, Name):
+      return False
+    return (self.fullname == other.fullname)
+
+  @property
+  def simple_name(self):
+    """Returns: the simple name part of this name."""
+    return self._name
+
+  @property
+  def namespace(self):
+    """Returns: this name's namespace, possible the empty string."""
+    return self._namespace
+
+  @property
+  def fullname(self):
+    """Returns: the full name (always contains a period '.')."""
+    return self._fullname
+
+
+# ------------------------------------------------------------------------------
+
+
+class Names(object):
+  """Tracks Avro named schemas and default namespace during parsing."""
+
+  def __init__(self, default_namespace=None, names=None):
+    """Initializes a new name tracker.
+
+    Args:
+      default_namespace: Optional default namespace.
+      names: Optional initial mapping of known named schemas.
+    """
+    if names is None:
+      names = {}
+    self._names = names
+    self._default_namespace = default_namespace
+
+  @property
+  def names(self):
+    """Returns: the mapping of known named schemas."""
+    return self._names
+
+  @property
+  def default_namespace(self):
+    """Returns: the default namespace, if any, or None."""
+    return self._default_namespace
+
+  def NewWithDefaultNamespace(self, namespace):
+    """Creates a new name tracker from this tracker, but with a new default ns.
+
+    Args:
+      namespace: New default namespace to use.
+    Returns:
+      New name tracker with the specified default namespace.
+    """
+    return Names(names=self._names, default_namespace=namespace)
+
+  def GetName(self, name, namespace=None):
+    """Resolves the Avro name according to this name tracker's state.
+
+    Args:
+      name: Name to resolve (absolute or relative).
+      namespace: Optional explicit namespace.
+    Returns:
+      The specified name, resolved according to this tracker.
+    """
+    if namespace is None: namespace = self._default_namespace
+    return Name(name=name, namespace=namespace)
+
+  def has_name(self, name, namespace=None):
+    avro_name = self.GetName(name=name, namespace=namespace)
+    return avro_name.fullname in self._names
+
+  def get_name(self, name, namespace=None):
+    avro_name = self.GetName(name=name, namespace=namespace)
+    return self._names.get(avro_name.fullname, None)
+
+  def GetSchema(self, name, namespace=None):
+    """Resolves an Avro schema by name.
+
+    Args:
+      name: Name (relative or absolute) of the Avro schema to look up.
+      namespace: Optional explicit namespace.
+    Returns:
+      The schema with the specified name, if any, or None.
+    """
+    avro_name = self.GetName(name=name, namespace=namespace)
+    return self._names.get(avro_name.fullname, None)
+
+  def prune_namespace(self, properties):
+    """given a properties, return properties with namespace removed if
+    it matches the own default namespace
+    """
+    if self.default_namespace is None:
+      # I have no default -- no change
+      return properties
+    if 'namespace' not in properties:
+      # he has no namespace - no change
+      return properties
+    if properties['namespace'] != self.default_namespace:
+      # we're different - leave his stuff alone
+      return properties
+    # we each have a namespace and it's redundant. delete his.
+    prunable = properties.copy()
+    del(prunable['namespace'])
+    return prunable
+
+  def Register(self, schema):
+    """Registers a new named schema in this tracker.
+
+    Args:
+      schema: Named Avro schema to register in this tracker.
+    """
+    if schema.fullname in VALID_TYPES:
+      raise SchemaParseException(
+          '%s is a reserved type name.' % schema.fullname)
+    if schema.fullname in self.names:
+      raise SchemaParseException(
+          'Avro name %r already exists.' % schema.fullname)
+
+    logging.log(DEBUG_VERBOSE, 'Register new name for %r', schema.fullname)
+    self._names[schema.fullname] = schema
+
+
+# ------------------------------------------------------------------------------
+
+
+class NamedSchema(Schema):
+  """Abstract base class for named schemas.
+
+  Named schemas are enumerated in NAMED_TYPES.
+  """
+
+  def __init__(
+      self,
+      type,
+      name,
+      namespace=None,
+      names=None,
+      other_props=None,
+  ):
+    """Initializes a new named schema object.
+
+    Args:
+      type: Type of the named schema.
+      name: Name (absolute or relative) of the schema.
+      namespace: Optional explicit namespace if name is relative.
+      names: Tracker to resolve and register Avro names.
+      other_props: Optional map of additional properties of the schema.
+    """
+    assert (type in NAMED_TYPES), ('Invalid named type: %r' % type)
+    self._avro_name = names.GetName(name=name, namespace=namespace)
+
+    super(NamedSchema, self).__init__(type, other_props)
+
+    names.Register(self)
+
+    self._props['name'] = self.name
+    if self.namespace:
+      self._props['namespace'] = self.namespace
+
+  @property
+  def avro_name(self):
+    """Returns: the Name object describing this schema's name."""
+    return self._avro_name
+
+  @property
+  def name(self):
+    return self._avro_name.simple_name
+
+  @property
+  def namespace(self):
+    return self._avro_name.namespace
+
+  @property
+  def fullname(self):
+    return self._avro_name.fullname
+
+  def name_ref(self, names):
+    """Reports this schema name relative to the specified name tracker.
+
+    Args:
+      names: Avro name tracker to relativise this schema name against.
+    Returns:
+      This schema name, relativised against the specified name tracker.
+    """
+    if self.namespace == names.default_namespace:
+      return self.name
+    else:
+      return self.fullname
+
+
+# ------------------------------------------------------------------------------
+
+
+_NO_DEFAULT = object()
+
+
+class Field(object):
+  """Representation of the schema of a field in a record."""
+
+  def __init__(
+      self,
+      type,
+      name,
+      index,
+      has_default,
+      default=_NO_DEFAULT,
+      order=None,
+      names=None,
+      doc=None,
+      other_props=None
+  ):
+    """Initializes a new Field object.
+
+    Args:
+      type: Avro schema of the field.
+      name: Name of the field.
+      index: 0-based position of the field.
+      has_default:
+      default:
+      order:
+      names:
+      doc:
+      other_props:
+    """
+    if (not isinstance(name, str)) or (len(name) == 0):
+      raise SchemaParseException('Invalid record field name: %r.' % name)
+    if (order is not None) and (order not in VALID_FIELD_SORT_ORDERS):
+      raise SchemaParseException('Invalid record field order: %r.' % order)
+
+    # All properties of this record field:
+    self._props = {}
+
+    self._has_default = has_default
+    if other_props:
+      self._props.update(other_props)
+
+    self._index = index
+    self._type = self._props['type'] = type
+    self._name = self._props['name'] = name
+
+    # TODO: check to ensure default is valid
+    if has_default:
+      self._props['default'] = default
+
+    if order is not None:
+      self._props['order'] = order
+
+    if doc is not None:
+      self._props['doc'] = doc
+
+  @property
+  def type(self):
+    """Returns: the schema of this field."""
+    return self._type
+
+  @property
+  def name(self):
+    """Returns: this field name."""
+    return self._name
+
+  @property
+  def index(self):
+    """Returns: the 0-based index of this field in the record."""
+    return self._index
+
+  @property
+  def default(self):
+    return self._props['default']
+
+  @property
+  def has_default(self):
+    return self._has_default
+
+  @property
+  def order(self):
+    return self._props.get('order', None)
+
+  @property
+  def doc(self):
+    return self._props.get('doc', None)
+
+  @property
+  def props(self):
+    return self._props
+
+  @property
+  def other_props(self):
+    return FilterKeysOut(items=self._props, keys=FIELD_RESERVED_PROPS)
+
+  def __str__(self):
+    return json.dumps(self.to_json())
+
+  def to_json(self, names=None):
+    if names is None:
+      names = Names()
+    to_dump = self.props.copy()
+    to_dump['type'] = self.type.to_json(names)
+    return to_dump
+
+  def __eq__(self, that):
+    to_cmp = json.loads(str(self))
+    return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+# Primitive Types
+
+
+class PrimitiveSchema(Schema):
+  """Schema of a primitive Avro type.
+
+  Valid primitive types are defined in PRIMITIVE_TYPES.
+  """
+
+  def __init__(self, type):
+    """Initializes a new schema object for the specified primitive type.
+
+    Args:
+      type: Type of the schema to construct. Must be primitive.
+    """
+    if type not in PRIMITIVE_TYPES:
+      raise AvroException('%r is not a valid primitive type.' % type)
+    super(PrimitiveSchema, self).__init__(type)
+
+  @property
+  def name(self):
+    """Returns: the simple name of this schema."""
+    # The name of a primitive type is the type itself.
+    return self.type
+
+  def to_json(self, names=None):
+    if len(self.props) == 1:
+      return self.fullname
+    else:
+      return self.props
+
+  def __eq__(self, that):
+    return self.props == that.props
+
+
+# ------------------------------------------------------------------------------
+# Complex Types (non-recursive)
+
+
+class FixedSchema(NamedSchema):
+  def __init__(
+      self,
+      name,
+      namespace,
+      size,
+      names=None,
+      other_props=None,
+  ):
+    # Ensure valid ctor args
+    if not isinstance(size, int):
+      fail_msg = 'Fixed Schema requires a valid integer for size property.'
+      raise AvroException(fail_msg)
+
+    super(FixedSchema, self).__init__(
+        type=FIXED,
+        name=name,
+        namespace=namespace,
+        names=names,
+        other_props=other_props,
+    )
+    self._props['size'] = size
+
+  @property
+  def size(self):
+    """Returns: the size of this fixed schema, in bytes."""
+    return self._props['size']
+
+  def to_json(self, names=None):
+    if names is None:
+      names = Names()
+    if self.fullname in names.names:
+      return self.name_ref(names)
+    else:
+      names.names[self.fullname] = self
+      return names.prune_namespace(self.props)
+
+  def __eq__(self, that):
+    return self.props == that.props
+
+
+# ------------------------------------------------------------------------------
+
+
+class EnumSchema(NamedSchema):
+  def __init__(
+      self,
+      name,
+      namespace,
+      symbols,
+      names=None,
+      doc=None,
+      other_props=None,
+  ):
+    """Initializes a new enumeration schema object.
+
+    Args:
+      name: Simple name of this enumeration.
+      namespace: Optional namespace.
+      symbols: Ordered list of symbols defined in this enumeration.
+      names:
+      doc:
+      other_props:
+    """
+    symbols = tuple(symbols)
+    symbol_set = frozenset(symbols)
+    if (len(symbol_set) != len(symbols)
+        or not all(map(lambda symbol: isinstance(symbol, str), symbols))):
+      raise AvroException(
+          'Invalid symbols for enum schema: %r.' % (symbols,))
+
+    super(EnumSchema, self).__init__(
+        type=ENUM,
+        name=name,
+        namespace=namespace,
+        names=names,
+        other_props=other_props,
+    )
+
+    self._props['symbols'] = tuple(sorted(symbol_set))
+    if doc is not None:
+      self._props['doc'] = doc
+
+  @property
+  def symbols(self):
+    """Returns: the symbols defined in this enum."""
+    return self._props['symbols']
+
+  def to_json(self, names=None):
+    if names is None:
+      names = Names()
+    if self.fullname in names.names:
+      return self.name_ref(names)
+    else:
+      names.names[self.fullname] = self
+      return names.prune_namespace(self.props)
+
+  def __eq__(self, that):
+    return self.props == that.props
+
+
+# ------------------------------------------------------------------------------
+# Complex Types (recursive)
+
+
+class ArraySchema(Schema):
+  """Schema of an array."""
+
+  def __init__(self, items, other_props=None):
+    """Initializes a new array schema object.
+
+    Args:
+      items: Avro schema of the array items.
+      other_props:
+    """
+    super(ArraySchema, self).__init__(
+        type=ARRAY,
+        other_props=other_props,
+    )
+    self._items_schema = items
+    self._props['items'] = items
+
+  @property
+  def items(self):
+    """Returns: the schema of the items in this array."""
+    return self._items_schema
+
+  def to_json(self, names=None):
+    if names is None:
+      names = Names()
+    to_dump = self.props.copy()
+    item_schema = self.items
+    to_dump['items'] = item_schema.to_json(names)
+    return to_dump
+
+  def __eq__(self, that):
+    to_cmp = json.loads(str(self))
+    return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+
+
+class MapSchema(Schema):
+  """Schema of a map."""
+
+  def __init__(self, values, other_props=None):
+    """Initializes a new map schema object.
+
+    Args:
+      values: Avro schema of the map values.
+      other_props:
+    """
+    super(MapSchema, self).__init__(
+        type=MAP,
+        other_props=other_props,
+    )
+    self._values_schema = values
+    self._props['values'] = values
+
+  @property
+  def values(self):
+    """Returns: the schema of the values in this map."""
+    return self._values_schema
+
+  def to_json(self, names=None):
+    if names is None:
+      names = Names()
+    to_dump = self.props.copy()
+    to_dump['values'] = self.values.to_json(names)
+    return to_dump
+
+  def __eq__(self, that):
+    to_cmp = json.loads(str(self))
+    return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+
+
+class UnionSchema(Schema):
+  """Schema of a union."""
+
+  def __init__(self, schemas):
+    """Initializes a new union schema object.
+
+    Args:
+      schemas: Ordered collection of schema branches in the union.
+    """
+    super(UnionSchema, self).__init__(type=UNION)
+    self._schemas = tuple(schemas)
+
+    # Validate the schema branches:
+
+    # All named schema names are unique:
+    named_branches = tuple(
+        filter(lambda schema: schema.type in NAMED_TYPES, self._schemas))
+    unique_names = frozenset(map(lambda schema: schema.fullname, named_branches))
+    if len(unique_names) != len(named_branches):
+      raise AvroException(
+          'Invalid union branches with duplicate schema name:%s'
+          % ''.join(map(lambda schema: ('\n\t - %s' % schema), self._schemas)))
+
+    # Types are unique within unnamed schemas, and union is not allowed:
+    unnamed_branches = tuple(
+        filter(lambda schema: schema.type not in NAMED_TYPES, self._schemas))
+    unique_types = frozenset(map(lambda schema: schema.type, unnamed_branches))
+    if UNION in unique_types:
+      raise AvroException(
+          'Invalid union branches contain other unions:%s'
+          % ''.join(map(lambda schema: ('\n\t - %s' % schema), self._schemas)))
+    if len(unique_types) != len(unnamed_branches):
+      raise AvroException(
+          'Invalid union branches with duplicate type:%s'
+          % ''.join(map(lambda schema: ('\n\t - %s' % schema), self._schemas)))
+
+  @property
+  def schemas(self):
+    """Returns: the ordered list of schema branches in the union."""
+    return self._schemas
+
+  def to_json(self, names=None):
+    if names is None:
+      names = Names()
+    to_dump = []
+    for schema in self.schemas:
+      to_dump.append(schema.to_json(names))
+    return to_dump
+
+  def __eq__(self, that):
+    to_cmp = json.loads(str(self))
+    return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+
+
+class ErrorUnionSchema(UnionSchema):
+  """Schema representing the declared errors of a protocol message."""
+
+  def __init__(self, schemas):
+    """Initializes an error-union schema.
+
+    Args:
+      schema: collection of error schema.
+    """
+    # TODO: check that string isn't already listed explicitly as an error.
+    # Prepend "string" to handle system errors
+    schemas = [PrimitiveSchema(type=STRING)] + list(schemas)
+    super(ErrorUnionSchema, self).__init__(schemas=schemas)
+
+  def to_json(self, names=None):
+    if names is None:
+      names = Names()
+    to_dump = []
+    for schema in self.schemas:
+      # Don't print the system error schema
+      if schema.type == STRING: continue
+      to_dump.append(schema.to_json(names))
+    return to_dump
+
+
+# ------------------------------------------------------------------------------
+
+
+class RecordSchema(NamedSchema):
+  """Schema of a record."""
+
+  @staticmethod
+  def _MakeField(index, field_desc, names):
+    """Builds field schemas from a list of field JSON descriptors.
+
+    Args:
+      index: 0-based index of the field in the record.
+      field_desc: JSON descriptors of a record field.
+      names: Avro schema tracker.
+    Return:
+      The field schema.
+    """
+    field_schema = SchemaFromJSONData(
+        json_data=field_desc['type'],
+        names=names,
+    )
+    other_props = (
+        dict(FilterKeysOut(items=field_desc, keys=FIELD_RESERVED_PROPS)))
+    return Field(
+        type=field_schema,
+        name=field_desc['name'],
+        index=index,
+        has_default=('default' in field_desc),
+        default=field_desc.get('default', _NO_DEFAULT),
+        order=field_desc.get('order', None),
+        names=names,
+        doc=field_desc.get('doc', None),
+        other_props=other_props,
+    )
+
+  @staticmethod
+  def _MakeFieldList(field_desc_list, names):
+    """Builds field schemas from a list of field JSON descriptors.
+
+    Guarantees field name unicity.
+
+    Args:
+      field_desc_list: collection of field JSON descriptors.
+      names: Avro schema tracker.
+    Yields
+      Field schemas.
+    """
+    for index, field_desc in enumerate(field_desc_list):
+      yield RecordSchema._MakeField(index, field_desc, names)
+
+  @staticmethod
+  def _MakeFieldMap(fields):
+    """Builds the field map.
+
+    Guarantees field name unicity.
+
+    Args:
+      fields: iterable of field schema.
+    Returns:
+      A read-only map of field schemas, indexed by name.
+    """
+    field_map = {}
+    for field in fields:
+      if field.name in field_map:
+        raise SchemaParseException(
+            'Duplicate field name %r in list %r.' % (field.name, field_desc_list))
+      field_map[field.name] = field
+    return ImmutableDict(field_map)
+
+  def __init__(
+      self,
+      name,
+      namespace,
+      fields=None,
+      make_fields=None,
+      names=None,
+      record_type=RECORD,
+      doc=None,
+      other_props=None
+  ):
+    """Initializes a new record schema object.
+
+    Args:
+      name: Name of the record (absolute or relative).
+      namespace: Optional namespace the record belongs to, if name is relative.
+      fields: collection of fields to add to this record.
+          Exactly one of fields or make_fields must be specified.
+      make_fields: function creating the fields that belong to the record.
+          The function signature is: make_fields(names) -> ordered field list.
+          Exactly one of fields or make_fields must be specified.
+      names:
+      record_type: Type of the record: one of RECORD, ERROR or REQUEST.
+          Protocol requests are not named.
+      doc:
+      other_props:
+    """
+    if record_type == REQUEST:
+      # Protocol requests are not named:
+      super(NamedSchema, self).__init__(
+          type=REQUEST,
+          other_props=other_props,
+      )
+    elif record_type in [RECORD, ERROR]:
+      # Register this record name in the tracker:
+      super(RecordSchema, self).__init__(
+          type=record_type,
+          name=name,
+          namespace=namespace,
+          names=names,
+          other_props=other_props,
+      )
+    else:
+      raise SchemaParseException(
+          'Invalid record type: %r.' % record_type)
+
+    if record_type in [RECORD, ERROR]:
+      avro_name = names.GetName(name=name, namespace=namespace)
+      nested_names = names.NewWithDefaultNamespace(namespace=avro_name.namespace)
+    elif record_type == REQUEST:
+      # Protocol request has no name: no need to change default namespace:
+      nested_names = names
+
+    if fields is None:
+      fields = make_fields(names=nested_names)
+    else:
+      assert (make_fields is None)
+    self._fields = tuple(fields)
+
+    self._field_map = RecordSchema._MakeFieldMap(self._fields)
+
+    self._props['fields'] = fields
+    if doc is not None:
+      self._props['doc'] = doc
+
+  @property
+  def fields(self):
+    """Returns: the field schemas, as an ordered tuple."""
+    return self._fields
+
+  @property
+  def field_map(self):
+    """Returns: a read-only map of the field schemas index by field names."""
+    return self._field_map
+
+  def to_json(self, names=None):
+    if names is None:
+      names = Names()
+    # Request records don't have names
+    if self.type == REQUEST:
+      return [f.to_json(names) for f in self.fields]
+
+    if self.fullname in names.names:
+      return self.name_ref(names)
+    else:
+      names.names[self.fullname] = self
+
+    to_dump = names.prune_namespace(self.props.copy())
+    to_dump['fields'] = [f.to_json(names) for f in self.fields]
+    return to_dump
+
+  def __eq__(self, that):
+    to_cmp = json.loads(str(self))
+    return to_cmp == json.loads(str(that))
+
+
+# ------------------------------------------------------------------------------
+# Module functions
+
+
+def FilterKeysOut(items, keys):
+  """Filters a collection of (key, value) items.
+
+  Exclude any item whose key belongs to keys.
+
+  Args:
+    items: Dictionary of items to filter the keys out of.
+    keys: Keys to filter out.
+  Yields:
+    Filtered items.
+  """
+  for key, value in items.items():
+    if key in keys: continue
+    yield (key, value)
+
+
+# ------------------------------------------------------------------------------
+
+
+def _SchemaFromJSONString(json_string, names):
+  if json_string in PRIMITIVE_TYPES:
+    return PrimitiveSchema(type=json_string)
+  else:
+    # Look for a known named schema:
+    schema = names.GetSchema(name=json_string)
+    if schema is None:
+      raise SchemaParseException(
+          'Unknown named schema %r, known names: %r.'
+          % (json_string, sorted(names.names)))
+    return schema
+
+
+def _SchemaFromJSONArray(json_array, names):
+  def MakeSchema(desc):
+    return SchemaFromJSONData(json_data=desc, names=names)
+  return UnionSchema(map(MakeSchema, json_array))
+
+
+def _SchemaFromJSONObject(json_object, names):
+  type = json_object.get('type')
+  if type is None:
+    raise SchemaParseException(
+        'Avro schema JSON descriptor has no "type" property: %r' % json_object)
+
+  other_props = dict(
+      FilterKeysOut(items=json_object, keys=SCHEMA_RESERVED_PROPS))
+
+  if type in PRIMITIVE_TYPES:
+    # FIXME should not ignore other properties
+    return PrimitiveSchema(type)
+
+  elif type in NAMED_TYPES:
+    name = json_object.get('name')
+    namespace = json_object.get('namespace', names.default_namespace)
+    if type == FIXED:
+      size = json_object.get('size')
+      return FixedSchema(name, namespace, size, names, other_props)
+    elif type == ENUM:
+      symbols = json_object.get('symbols')
+      doc = json_object.get('doc')
+      return EnumSchema(name, namespace, symbols, names, doc, other_props)
+
+    elif type in [RECORD, ERROR]:
+      field_desc_list = json_object.get('fields', ())
+
+      def MakeFields(names):
+        return tuple(RecordSchema._MakeFieldList(field_desc_list, names))
+
+      return RecordSchema(
+          name=name,
+          namespace=namespace,
+          make_fields=MakeFields,
+          names=names,
+          record_type=type,
+          doc=json_object.get('doc'),
+          other_props=other_props,
+      )
+    else:
+      raise Exception('Internal error: unknown type %r.' % type)
+
+  elif type in VALID_TYPES:
+    # Unnamed, non-primitive Avro type:
+
+    if type == ARRAY:
+      items_desc = json_object.get('items')
+      if items_desc is None:
+        raise SchemaParseException(
+            'Invalid array schema descriptor with no "items" : %r.'
+            % json_object)
+      return ArraySchema(
+          items=SchemaFromJSONData(items_desc, names),
+          other_props=other_props,
+      )
+
+    elif type == MAP:
+      values_desc = json_object.get('values')
+      if values_desc is None:
+        raise SchemaParseException(
+            'Invalid map schema descriptor with no "values" : %r.'
+            % json_object)
+      return MapSchema(
+          values=SchemaFromJSONData(values_desc, names=names),
+          other_props=other_props,
+      )
+
+    elif type == ERROR_UNION:
+      error_desc_list = json_object.get('declared_errors')
+      assert (error_desc_list is not None)
+      error_schemas = map(
+          lambda desc: SchemaFromJSONData(desc, names=names),
+          error_desc_list)
+      return ErrorUnionSchema(schemas=error_schemas)
+
+    else:
+      raise Exception('Internal error: unknown type %r.' % type)
+
+  raise SchemaParseException(
+      'Invalid JSON descriptor for an Avro schema: %r' % json_object)
+
+
+# Parsers for the JSON data types:
+_JSONDataParserTypeMap = {
+  str: _SchemaFromJSONString,
+  list: _SchemaFromJSONArray,
+  dict: _SchemaFromJSONObject,
+}
+
+
+def SchemaFromJSONData(json_data, names=None):
+  """Builds an Avro Schema from its JSON descriptor.
+
+  Args:
+    json_data: JSON data representing the descriptor of the Avro schema.
+    names: Optional tracker for Avro named schemas.
+  Returns:
+    The Avro schema parsed from the JSON descriptor.
+  Raises:
+    SchemaParseException: if the descriptor is invalid.
+  """
+  if names is None:
+    names = Names()
+
+  # Select the appropriate parser based on the JSON data type:
+  parser = _JSONDataParserTypeMap.get(type(json_data))
+  if parser is None:
+    raise SchemaParseException(
+        'Invalid JSON descriptor for an Avro schema: %r.' % json_data)
+  return parser(json_data, names=names)
+
+
+# ------------------------------------------------------------------------------
+
+
+def Parse(json_string):
+  """Constructs a Schema from its JSON descriptor in text form.
+
+  Args:
+    json_string: String representation of the JSON descriptor of the schema.
+  Returns:
+    The parsed schema.
+  Raises:
+    SchemaParseException: on JSON parsing error,
+        or if the JSON descriptor is invalid.
+  """
+  try:
+    json_data = json.loads(json_string)
+  except Exception as exn:
+    raise SchemaParseException(
+        'Error parsing schema from JSON: %r. '
+        'Error message: %r.'
+        % (json_string, exn))
+
+  # Initialize the names object
+  names = Names()
+
+  # construct the Avro Schema object
+  return SchemaFromJSONData(json_data, names)

Propchange: avro/trunk/lang/py3/avro/schema.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/tests/av_bench.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/av_bench.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/tests/av_bench.py (added)
+++ avro/trunk/lang/py3/avro/tests/av_bench.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,119 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import random
+import string
+import sys
+import time
+
+import avro.datafile
+import avro.io
+import avro.schema
+
+
+TYPES = ('A', 'CNAME',)
+FILENAME = 'datafile.avr'
+
+
+def GenerateRandomName():
+  return ''.join(random.sample(string.ascii_lowercase, 15))
+
+
+def GenerateRandomIP():
+  return '%s.%s.%s.%s' % (
+      random.randint(0, 255),
+      random.randint(0, 255),
+      random.randint(0, 255),
+      random.randint(0, 255),
+  )
+
+
+def Write(nrecords):
+  """Writes a data file with the specified number of random records.
+
+  Args:
+    nrecords: Number of records to write.
+  """
+  schema_s = """
+  {
+    "type": "record",
+    "name": "Query",
+    "fields" : [
+      {"name": "query", "type": "string"},
+      {"name": "response", "type": "string"},
+      {"name": "type", "type": "string", "default": "A"}
+    ]
+  }
+  """
+  schema = avro.schema.Parse(schema_s)
+  writer = avro.io.DatumWriter(schema)
+
+  with open(FILENAME, 'wb') as out:
+    with avro.datafile.DataFileWriter(
+        out, writer, schema,
+        # codec='deflate'
+    ) as data_writer:
+      for _ in range(nrecords):
+        response = GenerateRandomIP()
+        query = GenerateRandomName()
+        type = random.choice(TYPES)
+        data_writer.append({
+            'query': query,
+            'response': response,
+            'type': type,
+        })
+
+
+def Read(expect_nrecords):
+  """Reads the data file generated by Write()."""
+  with open(FILENAME, 'rb') as f:
+    reader = avro.io.DatumReader()
+    with avro.datafile.DataFileReader(f, reader) as file_reader:
+      nrecords = 0
+      for record in file_reader:
+        nrecords += 1
+      assert (nrecords == expect_nrecords), (
+          'Expecting %d records, got %d.' % (expected_nrecords, nrecords))
+
+
+def Timing(f, *args):
+  s = time.time()
+  f(*args)
+  e = time.time()
+  return e - s
+
+
+def Main(args):
+  nrecords = int(args[1])
+  print('Write %0.4f' % Timing(Write, nrecords))
+  print('Read %0.4f' % Timing(Read, nrecords))
+
+
+if __name__ == '__main__':
+  log_formatter = logging.Formatter(
+      '%(asctime)s %(levelname)s %(filename)s:%(lineno)s : %(message)s')
+  logging.root.setLevel(logging.DEBUG)
+  console_handler = logging.StreamHandler()
+  console_handler.setFormatter(log_formatter)
+  console_handler.setLevel(logging.DEBUG)
+  logging.root.addHandler(console_handler)
+
+  Main(sys.argv)

Propchange: avro/trunk/lang/py3/avro/tests/av_bench.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/tests/gen_interop_data.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/gen_interop_data.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/tests/gen_interop_data.py (added)
+++ avro/trunk/lang/py3/avro/tests/gen_interop_data.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,56 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+
+from avro import datafile
+from avro import io
+from avro import schema
+
+
+DATUM = {
+    'intField': 12,
+    'longField': 15234324,
+    'stringField': 'hey',
+    'boolField': True,
+    'floatField': 1234.0,
+    'doubleField': -1234.0,
+    'bytesField': '12312adf',
+    'nullField': None,
+    'arrayField': [5.0, 0.0, 12.0],
+    'mapField': {'a': {'label': 'a'}, 'bee': {'label': 'cee'}},
+    'unionField': 12.0,
+    'enumField': 'C',
+    'fixedField': b'1019181716151413',
+    'recordField': {
+        'label': 'blah',
+        'children': [{'label': 'inner', 'children': []}],
+    },
+}
+
+
+if __name__ == "__main__":
+  interop_schema = schema.Parse(open(sys.argv[1], 'r').read())
+  writer = open(sys.argv[2], 'wb')
+  datum_writer = io.DatumWriter()
+  # NB: not using compression
+  dfw = datafile.DataFileWriter(writer, datum_writer, interop_schema)
+  dfw.append(DATUM)
+  dfw.close()

Propchange: avro/trunk/lang/py3/avro/tests/gen_interop_data.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/tests/run_tests.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/run_tests.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/tests/run_tests.py (added)
+++ avro/trunk/lang/py3/avro/tests/run_tests.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,76 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Runs all tests.
+
+Usage:
+
+- Run tests from all modules:
+    ./run_tests.py discover [-v]
+
+- Run tests in a specific module:
+    ./run_tests.py test_schema [-v]
+
+- Run a specific test:
+    ./run_tests.py test_schema.TestSchema.testParse [-v]
+
+- Set logging level:
+    PYTHON_LOG_LEVEL=<log-level> ./run_tests.py ...
+    log-level  0 includes all logging.
+    log-level 10 includes debug logging.
+    log-level 20 includes info logging.
+
+- Command-line help:
+  ./run_tests.py -h
+  ./run_tests.py discover -h
+"""
+
+import logging
+import os
+import sys
+import unittest
+
+from avro.tests.test_datafile import *
+from avro.tests.test_datafile_interop import *
+from avro.tests.test_io import *
+from avro.tests.test_ipc import *
+from avro.tests.test_protocol import *
+from avro.tests.test_schema import *
+from avro.tests.test_script import *
+
+
+def SetupLogging():
+  log_level = int(os.environ.get('PYTHON_LOG_LEVEL', logging.INFO))
+
+  log_formatter = logging.Formatter(
+      '%(asctime)s %(levelname)s %(filename)s:%(lineno)s : %(message)s')
+  logging.root.handlers = list()  # list.clear() only exists in python 3.3+
+  logging.root.setLevel(log_level)
+  console_handler = logging.StreamHandler()
+  console_handler.setFormatter(log_formatter)
+  console_handler.setLevel(logging.DEBUG)
+  logging.root.addHandler(console_handler)
+
+
+SetupLogging()
+
+
+if __name__ == '__main__':
+  unittest.main()

Propchange: avro/trunk/lang/py3/avro/tests/run_tests.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/tests/sample_http_client.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/sample_http_client.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/tests/sample_http_client.py (added)
+++ avro/trunk/lang/py3/avro/tests/sample_http_client.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,94 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import sys
+
+from avro import ipc
+from avro import protocol
+
+MAIL_PROTOCOL_JSON = """\
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+     {"name": "Message", "type": "record",
+      "fields": [
+          {"name": "to",   "type": "string"},
+          {"name": "from", "type": "string"},
+          {"name": "body", "type": "string"}
+      ]
+     }
+ ],
+
+ "messages": {
+     "send": {
+         "request": [{"name": "message", "type": "Message"}],
+         "response": "string"
+     },
+     "replay": {
+         "request": [],
+         "response": "string"
+     }
+ }
+}
+"""
+MAIL_PROTOCOL = protocol.Parse(MAIL_PROTOCOL_JSON)
+SERVER_HOST = 'localhost'
+SERVER_PORT = 9090
+
+class UsageError(Exception):
+  def __init__(self, value):
+    self.value = value
+  def __str__(self):
+    return repr(self.value)
+
+def make_requestor(server_host, server_port, protocol):
+  client = ipc.HTTPTransceiver(SERVER_HOST, SERVER_PORT)
+  return ipc.Requestor(protocol, client)
+
+if __name__ == '__main__':
+  if len(sys.argv) not in [4, 5]:
+    raise UsageError("Usage: <to> <from> <body> [<count>]")
+
+  # client code - attach to the server and send a message
+  # fill in the Message record
+  message = dict()
+  message['to'] = sys.argv[1]
+  message['from'] = sys.argv[2]
+  message['body'] = sys.argv[3]
+
+  try:
+    num_messages = int(sys.argv[4])
+  except:
+    num_messages = 1
+
+  # build the parameters for the request
+  params = {}
+  params['message'] = message
+
+  # send the requests and print the result
+  for msg_count in range(num_messages):
+    requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
+    result = requestor.request('send', params)
+    print("Result: " + result)
+
+  # try out a replay message
+  requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
+  result = requestor.request('replay', dict())
+  print("Replay Result: " + result)

Propchange: avro/trunk/lang/py3/avro/tests/sample_http_client.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/tests/sample_http_server.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/sample_http_server.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/tests/sample_http_server.py (added)
+++ avro/trunk/lang/py3/avro/tests/sample_http_server.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,81 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+from avro import ipc
+from avro import protocol
+
+MAIL_PROTOCOL_JSON = """\
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+     {"name": "Message", "type": "record",
+      "fields": [
+          {"name": "to",   "type": "string"},
+          {"name": "from", "type": "string"},
+          {"name": "body", "type": "string"}
+      ]
+     }
+ ],
+
+ "messages": {
+     "send": {
+         "request": [{"name": "message", "type": "Message"}],
+         "response": "string"
+     },
+     "replay": {
+         "request": [],
+         "response": "string"
+     }
+ }
+}
+"""
+MAIL_PROTOCOL = protocol.Parse(MAIL_PROTOCOL_JSON)
+SERVER_ADDRESS = ('localhost', 9090)
+
+class MailResponder(ipc.Responder):
+  def __init__(self):
+    ipc.Responder.__init__(self, MAIL_PROTOCOL)
+
+  def invoke(self, message, request):
+    if message.name == 'send':
+      request_content = request['message']
+      response = "Sent message to %(to)s from %(from)s with body %(body)s" % \
+                 request_content
+      return response
+    elif message.name == 'replay':
+      return 'replay'
+
+class MailHandler(BaseHTTPRequestHandler):
+  def do_POST(self):
+    self.responder = MailResponder()
+    call_request_reader = ipc.FramedReader(self.rfile)
+    call_request = call_request_reader.read_framed_message()
+    resp_body = self.responder.respond(call_request)
+    self.send_response(200)
+    self.send_header('Content-Type', 'avro/binary')
+    self.end_headers()
+    resp_writer = ipc.FramedWriter(self.wfile)
+    resp_writer.write_framed_message(resp_body)
+
+if __name__ == '__main__':
+  mail_server = HTTPServer(SERVER_ADDRESS, MailHandler)
+  mail_server.allow_reuse_address = True
+  mail_server.serve_forever()

Propchange: avro/trunk/lang/py3/avro/tests/sample_http_server.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/tests/test_datafile.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/test_datafile.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/tests/test_datafile.py (added)
+++ avro/trunk/lang/py3/avro/tests/test_datafile.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,278 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import tempfile
+import unittest
+
+from avro import datafile
+from avro import io
+from avro import schema
+
+
+# ------------------------------------------------------------------------------
+
+
+SCHEMAS_TO_VALIDATE = (
+  ('"null"', None),
+  ('"boolean"', True),
+  ('"string"', 'adsfasdf09809dsf-=adsf'),
+  ('"bytes"', b'12345abcd'),
+  ('"int"', 1234),
+  ('"long"', 1234),
+  ('"float"', 1234.0),
+  ('"double"', 1234.0),
+  ('{"type": "fixed", "name": "Test", "size": 1}', b'B'),
+  ('{"type": "enum", "name": "Test", "symbols": ["A", "B"]}', 'B'),
+  ('{"type": "array", "items": "long"}', [1, 3, 2]),
+  ('{"type": "map", "values": "long"}', {'a': 1, 'b': 3, 'c': 2}),
+  ('["string", "null", "long"]', None),
+
+  ("""
+   {
+     "type": "record",
+     "name": "Test",
+     "fields": [{"name": "f", "type": "long"}]
+   }
+   """,
+   {'f': 5}),
+
+  ("""
+   {
+     "type": "record",
+     "name": "Lisp",
+     "fields": [{
+        "name": "value",
+        "type": [
+          "null",
+          "string",
+          {
+            "type": "record",
+            "name": "Cons",
+            "fields": [{"name": "car", "type": "Lisp"},
+                       {"name": "cdr", "type": "Lisp"}]
+          }
+        ]
+     }]
+   }
+   """,
+   {'value': {'car': {'value': 'head'}, 'cdr': {'value': None}}}),
+)
+
+CODECS_TO_VALIDATE = ('null', 'deflate')
+
+try:
+  import snappy
+  CODECS_TO_VALIDATE += ('snappy',)
+except ImportError:
+  logging.info('Snappy not present, will skip testing it.')
+
+
+# ------------------------------------------------------------------------------
+
+
+class TestDataFile(unittest.TestCase):
+
+  @classmethod
+  def setUpClass(cls):
+    cls._temp_dir = (
+        tempfile.TemporaryDirectory(prefix=cls.__name__, suffix='.tmp'))
+    logging.debug('Created temporary directory: %s', cls._temp_dir.name)
+
+  @classmethod
+  def tearDownClass(cls):
+    logging.debug('Cleaning up temporary directory: %s', cls._temp_dir.name)
+    cls._temp_dir.cleanup()
+
+  def NewTempFile(self):
+    """Creates a new temporary file.
+
+    File is automatically cleaned up after test.
+
+    Returns:
+      The path of the new temporary file.
+    """
+    temp_file = tempfile.NamedTemporaryFile(
+        dir=self._temp_dir.name,
+        prefix='test',
+        suffix='.avro',
+        delete=False,
+    )
+    return temp_file.name
+
+  def testRoundTrip(self):
+    correct = 0
+    for iexample, (writer_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
+      for codec in CODECS_TO_VALIDATE:
+        file_path = self.NewTempFile()
+
+        # Write the datum this many times in the data file:
+        nitems = 10
+
+        logging.debug(
+            'Performing round-trip with codec %r in file %s for example #%d\n'
+            'Writing datum: %r using writer schema:\n%s',
+            codec, file_path, iexample,
+            datum, writer_schema)
+
+        logging.debug('Creating data file %r', file_path)
+        with open(file_path, 'wb') as writer:
+          datum_writer = io.DatumWriter()
+          schema_object = schema.Parse(writer_schema)
+          with datafile.DataFileWriter(
+              writer=writer,
+              datum_writer=datum_writer,
+              writer_schema=schema_object,
+              codec=codec,
+          ) as dfw:
+            for _ in range(nitems):
+              dfw.append(datum)
+
+        logging.debug('Reading data from %r', file_path)
+        with open(file_path, 'rb') as reader:
+          datum_reader = io.DatumReader()
+          with datafile.DataFileReader(reader, datum_reader) as dfr:
+            round_trip_data = list(dfr)
+
+        logging.debug(
+            'Round-trip data has %d items: %r',
+            len(round_trip_data), round_trip_data)
+
+        if ([datum] * nitems) == round_trip_data:
+          correct += 1
+        else:
+          logging.error(
+              'Round-trip data does not match:\n'
+              'Expect: %r\n'
+              'Actual: %r',
+              [datum] * nitems,
+              round_trip_data)
+
+    self.assertEqual(
+        correct,
+        len(CODECS_TO_VALIDATE) * len(SCHEMAS_TO_VALIDATE))
+
+  def testAppend(self):
+    correct = 0
+    for iexample, (writer_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
+      for codec in CODECS_TO_VALIDATE:
+        file_path = self.NewTempFile()
+
+        logging.debug(
+            'Performing append with codec %r in file %s for example #%d\n'
+            'Writing datum: %r using writer schema:\n%s',
+            codec, file_path, iexample,
+            datum, writer_schema)
+
+        logging.debug('Creating data file %r', file_path)
+        with open(file_path, 'wb') as writer:
+          datum_writer = io.DatumWriter()
+          schema_object = schema.Parse(writer_schema)
+          with datafile.DataFileWriter(
+              writer=writer,
+              datum_writer=datum_writer,
+              writer_schema=schema_object,
+              codec=codec,
+          ) as dfw:
+            dfw.append(datum)
+
+        logging.debug('Appending data to %r', file_path)
+        for i in range(9):
+          with open(file_path, 'ab+') as writer:
+            with datafile.DataFileWriter(writer, io.DatumWriter()) as dfw:
+              dfw.append(datum)
+
+        logging.debug('Reading appended data from %r', file_path)
+        with open(file_path, 'rb') as reader:
+          datum_reader = io.DatumReader()
+          with datafile.DataFileReader(reader, datum_reader) as dfr:
+            appended_data = list(dfr)
+
+        logging.debug(
+            'Appended data has %d items: %r',
+            len(appended_data), appended_data)
+
+        if ([datum] * 10) == appended_data:
+          correct += 1
+        else:
+          logging.error(
+              'Appended data does not match:\n'
+              'Expect: %r\n'
+              'Actual: %r',
+              [datum] * 10,
+              appended_data)
+
+    self.assertEqual(
+        correct,
+        len(CODECS_TO_VALIDATE) * len(SCHEMAS_TO_VALIDATE))
+
+  def testContextManager(self):
+    file_path = self.NewTempFile()
+
+    # Test the writer with a 'with' statement.
+    with open(file_path, 'wb') as writer:
+      datum_writer = io.DatumWriter()
+      sample_schema, sample_datum = SCHEMAS_TO_VALIDATE[1]
+      schema_object = schema.Parse(sample_schema)
+      with datafile.DataFileWriter(writer, datum_writer, schema_object) as dfw:
+        dfw.append(sample_datum)
+      self.assertTrue(writer.closed)
+
+    # Test the reader with a 'with' statement.
+    datums = []
+    with open(file_path, 'rb') as reader:
+      datum_reader = io.DatumReader()
+      with datafile.DataFileReader(reader, datum_reader) as dfr:
+        for datum in dfr:
+          datums.append(datum)
+      self.assertTrue(reader.closed)
+
+  def testMetadata(self):
+    file_path = self.NewTempFile()
+
+    # Test the writer with a 'with' statement.
+    with open(file_path, 'wb') as writer:
+      datum_writer = io.DatumWriter()
+      sample_schema, sample_datum = SCHEMAS_TO_VALIDATE[1]
+      schema_object = schema.Parse(sample_schema)
+      with datafile.DataFileWriter(writer, datum_writer, schema_object) as dfw:
+        dfw.SetMeta('test.string', 'foo')
+        dfw.SetMeta('test.number', '1')
+        dfw.append(sample_datum)
+      self.assertTrue(writer.closed)
+
+    # Test the reader with a 'with' statement.
+    datums = []
+    with open(file_path, 'rb') as reader:
+      datum_reader = io.DatumReader()
+      with datafile.DataFileReader(reader, datum_reader) as dfr:
+        self.assertEqual(b'foo', dfr.GetMeta('test.string'))
+        self.assertEqual(b'1', dfr.GetMeta('test.number'))
+        for datum in dfr:
+          datums.append(datum)
+      self.assertTrue(reader.closed)
+
+
+# ------------------------------------------------------------------------------
+
+
+if __name__ == '__main__':
+  raise Exception('Use run_tests.py')

Propchange: avro/trunk/lang/py3/avro/tests/test_datafile.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/tests/test_datafile_interop.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/test_datafile_interop.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/tests/test_datafile_interop.py (added)
+++ avro/trunk/lang/py3/avro/tests/test_datafile_interop.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,83 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import tempfile
+import unittest
+
+from avro import datafile
+from avro import io
+from avro import schema
+
+
+def GetInteropSchema():
+  test_dir = os.path.dirname(os.path.abspath(__file__))
+  schema_json_path = os.path.join(test_dir, 'interop.avsc')
+  with open(schema_json_path, 'r') as f:
+    schema_json = f.read()
+  return schema.Parse(schema_json)
+
+
+INTEROP_SCHEMA = GetInteropSchema()
+INTEROP_DATUM = {
+    'intField': 12,
+    'longField': 15234324,
+    'stringField': 'hey',
+    'boolField': True,
+    'floatField': 1234.0,
+    'doubleField': -1234.0,
+    'bytesField': b'12312adf',
+    'nullField': None,
+    'arrayField': [5.0, 0.0, 12.0],
+    'mapField': {'a': {'label': 'a'}, 'bee': {'label': 'cee'}},
+    'unionField': 12.0,
+    'enumField': 'C',
+    'fixedField': b'1019181716151413',
+    'recordField': {
+        'label': 'blah',
+        'children': [{'label': 'inner', 'children': []}],
+    },
+}
+
+
+def WriteDataFile(path, datum, schema):
+  datum_writer = io.DatumWriter()
+  with open(path, 'wb') as writer:
+    # NB: not using compression
+    with datafile.DataFileWriter(writer, datum_writer, schema) as dfw:
+      dfw.append(datum)
+
+
+class TestDataFileInterop(unittest.TestCase):
+  def testInterop(self):
+    with tempfile.NamedTemporaryFile() as temp_path:
+      WriteDataFile(temp_path.name, INTEROP_DATUM, INTEROP_SCHEMA)
+
+      # read data in binary from file
+      datum_reader = io.DatumReader()
+      with open(temp_path.name, 'rb') as reader:
+        dfr = datafile.DataFileReader(reader, datum_reader)
+        for datum in dfr:
+          self.assertEqual(INTEROP_DATUM, datum)
+
+
+if __name__ == '__main__':
+  raise Exception('Use run_tests.py')

Propchange: avro/trunk/lang/py3/avro/tests/test_datafile_interop.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/tests/test_io.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/test_io.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/tests/test_io.py (added)
+++ avro/trunk/lang/py3/avro/tests/test_io.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,351 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import binascii
+import io
+import logging
+import sys
+import unittest
+
+from avro import io as avro_io
+from avro import schema
+
+
+SCHEMAS_TO_VALIDATE = (
+  ('"null"', None),
+  ('"boolean"', True),
+  ('"string"', 'adsfasdf09809dsf-=adsf'),
+  ('"bytes"', b'12345abcd'),
+  ('"int"', 1234),
+  ('"long"', 1234),
+  ('"float"', 1234.0),
+  ('"double"', 1234.0),
+  ('{"type": "fixed", "name": "Test", "size": 1}', b'B'),
+  ('{"type": "enum", "name": "Test", "symbols": ["A", "B"]}', 'B'),
+  ('{"type": "array", "items": "long"}', [1, 3, 2]),
+  ('{"type": "map", "values": "long"}', {'a': 1, 'b': 3, 'c': 2}),
+  ('["string", "null", "long"]', None),
+  ("""\
+   {"type": "record",
+    "name": "Test",
+    "fields": [{"name": "f", "type": "long"}]}
+   """, {'f': 5}),
+  ("""
+   {
+     "type": "record",
+     "name": "Lisp",
+     "fields": [{
+       "name": "value",
+       "type": [
+         "null",
+         "string",
+         {
+           "type": "record",
+           "name": "Cons",
+           "fields": [{"name": "car", "type": "Lisp"},
+                      {"name": "cdr", "type": "Lisp"}]
+         }
+       ]
+     }]
+   }
+   """, {'value': {'car': {'value': 'head'}, 'cdr': {'value': None}}}),
+)
+
+BINARY_ENCODINGS = (
+  (0, '00'),
+  (-1, '01'),
+  (1, '02'),
+  (-2, '03'),
+  (2, '04'),
+  (-64, '7f'),
+  (64, '80 01'),
+  (8192, '80 80 01'),
+  (-8193, '81 80 01'),
+)
+
+DEFAULT_VALUE_EXAMPLES = (
+  ('"null"', 'null', None),
+  ('"boolean"', 'true', True),
+  ('"string"', '"foo"', 'foo'),
+  ('"bytes"', '"\u00FF\u00FF"', '\xff\xff'),
+  ('"int"', '5', 5),
+  ('"long"', '5', 5),
+  ('"float"', '1.1', 1.1),
+  ('"double"', '1.1', 1.1),
+  ('{"type": "fixed", "name": "F", "size": 2}', '"\u00FF\u00FF"', '\xff\xff'),
+  ('{"type": "enum", "name": "F", "symbols": ["FOO", "BAR"]}', '"FOO"', 'FOO'),
+  ('{"type": "array", "items": "int"}', '[1, 2, 3]', [1, 2, 3]),
+  ('{"type": "map", "values": "int"}', '{"a": 1, "b": 2}', {'a': 1, 'b': 2}),
+  ('["int", "null"]', '5', 5),
+  ('{"type": "record", "name": "F", "fields": [{"name": "A", "type": "int"}]}',
+   '{"A": 5}', {'A': 5}),
+)
+
+LONG_RECORD_SCHEMA = schema.Parse("""
+{
+  "type": "record",
+  "name": "Test",
+  "fields": [
+    {"name": "A", "type": "int"},
+    {"name": "B", "type": "int"},
+    {"name": "C", "type": "int"},
+    {"name": "D", "type": "int"},
+    {"name": "E", "type": "int"},
+    {"name": "F", "type": "int"},
+    {"name": "G", "type": "int"}
+  ]
+}
+""")
+
+LONG_RECORD_DATUM = {'A': 1, 'B': 2, 'C': 3, 'D': 4, 'E': 5, 'F': 6, 'G': 7}
+
+
+def avro_hexlify(reader):
+  """Return the hex value, as a string, of a binary-encoded int or long."""
+  bytes = []
+  current_byte = reader.read(1)
+  bytes.append(binascii.hexlify(current_byte).decode())
+  while (ord(current_byte) & 0x80) != 0:
+    current_byte = reader.read(1)
+    bytes.append(binascii.hexlify(current_byte).decode())
+  return ' '.join(bytes)
+
+
+def write_datum(datum, writer_schema):
+  writer = io.BytesIO()
+  encoder = avro_io.BinaryEncoder(writer)
+  datum_writer = avro_io.DatumWriter(writer_schema)
+  datum_writer.write(datum, encoder)
+  return writer, encoder, datum_writer
+
+
+def read_datum(buffer, writer_schema, reader_schema=None):
+  reader = io.BytesIO(buffer.getvalue())
+  decoder = avro_io.BinaryDecoder(reader)
+  datum_reader = avro_io.DatumReader(writer_schema, reader_schema)
+  return datum_reader.read(decoder)
+
+
+def check_binary_encoding(number_type):
+  logging.debug('Testing binary encoding for type %s', number_type)
+  correct = 0
+  for datum, hex_encoding in BINARY_ENCODINGS:
+    logging.debug('Datum: %d', datum)
+    logging.debug('Correct Encoding: %s', hex_encoding)
+
+    writer_schema = schema.Parse('"%s"' % number_type.lower())
+    writer, encoder, datum_writer = write_datum(datum, writer_schema)
+    writer.seek(0)
+    hex_val = avro_hexlify(writer)
+
+    logging.debug('Read Encoding: %s', hex_val)
+    if hex_encoding == hex_val: correct += 1
+  return correct
+
+
+def check_skip_number(number_type):
+  logging.debug('Testing skip number for %s', number_type)
+  correct = 0
+  for value_to_skip, hex_encoding in BINARY_ENCODINGS:
+    VALUE_TO_READ = 6253
+    logging.debug('Value to Skip: %d', value_to_skip)
+
+    # write the value to skip and a known value
+    writer_schema = schema.Parse('"%s"' % number_type.lower())
+    writer, encoder, datum_writer = write_datum(value_to_skip, writer_schema)
+    datum_writer.write(VALUE_TO_READ, encoder)
+
+    # skip the value
+    reader = io.BytesIO(writer.getvalue())
+    decoder = avro_io.BinaryDecoder(reader)
+    decoder.skip_long()
+
+    # read data from string buffer
+    datum_reader = avro_io.DatumReader(writer_schema)
+    read_value = datum_reader.read(decoder)
+
+    logging.debug('Read Value: %d', read_value)
+    if read_value == VALUE_TO_READ: correct += 1
+  return correct
+
+
+# ------------------------------------------------------------------------------
+
+
+class TestIO(unittest.TestCase):
+  #
+  # BASIC FUNCTIONALITY
+  #
+
+  def testValidate(self):
+    passed = 0
+    for example_schema, datum in SCHEMAS_TO_VALIDATE:
+      logging.debug('Schema: %r', example_schema)
+      logging.debug('Datum: %r', datum)
+      validated = avro_io.Validate(schema.Parse(example_schema), datum)
+      logging.debug('Valid: %s', validated)
+      if validated: passed += 1
+    self.assertEqual(passed, len(SCHEMAS_TO_VALIDATE))
+
+  def testRoundTrip(self):
+    correct = 0
+    for example_schema, datum in SCHEMAS_TO_VALIDATE:
+      logging.debug('Schema: %s', example_schema)
+      logging.debug('Datum: %s', datum)
+
+      writer_schema = schema.Parse(example_schema)
+      writer, encoder, datum_writer = write_datum(datum, writer_schema)
+      round_trip_datum = read_datum(writer, writer_schema)
+
+      logging.debug('Round Trip Datum: %s', round_trip_datum)
+      if datum == round_trip_datum: correct += 1
+    self.assertEqual(correct, len(SCHEMAS_TO_VALIDATE))
+
+  #
+  # BINARY ENCODING OF INT AND LONG
+  #
+
+  def testBinaryIntEncoding(self):
+    correct = check_binary_encoding('int')
+    self.assertEqual(correct, len(BINARY_ENCODINGS))
+
+  def testBinaryLongEncoding(self):
+    correct = check_binary_encoding('long')
+    self.assertEqual(correct, len(BINARY_ENCODINGS))
+
+  def testSkipInt(self):
+    correct = check_skip_number('int')
+    self.assertEqual(correct, len(BINARY_ENCODINGS))
+
+  def testSkipLong(self):
+    correct = check_skip_number('long')
+    self.assertEqual(correct, len(BINARY_ENCODINGS))
+
+  #
+  # SCHEMA RESOLUTION
+  #
+
+  def testSchemaPromotion(self):
+    # note that checking writer_schema.type in read_data
+    # allows us to handle promotion correctly
+    promotable_schemas = ['"int"', '"long"', '"float"', '"double"']
+    incorrect = 0
+    for i, ws in enumerate(promotable_schemas):
+      writer_schema = schema.Parse(ws)
+      datum_to_write = 219
+      for rs in promotable_schemas[i + 1:]:
+        reader_schema = schema.Parse(rs)
+        writer, enc, dw = write_datum(datum_to_write, writer_schema)
+        datum_read = read_datum(writer, writer_schema, reader_schema)
+        logging.debug('Writer: %s Reader: %s', writer_schema, reader_schema)
+        logging.debug('Datum Read: %s', datum_read)
+        if datum_read != datum_to_write: incorrect += 1
+    self.assertEqual(incorrect, 0)
+
+  def testUnknownSymbol(self):
+    writer_schema = schema.Parse("""\
+      {"type": "enum", "name": "Test",
+       "symbols": ["FOO", "BAR"]}""")
+    datum_to_write = 'FOO'
+
+    reader_schema = schema.Parse("""\
+      {"type": "enum", "name": "Test",
+       "symbols": ["BAR", "BAZ"]}""")
+
+    writer, encoder, datum_writer = write_datum(datum_to_write, writer_schema)
+    reader = io.BytesIO(writer.getvalue())
+    decoder = avro_io.BinaryDecoder(reader)
+    datum_reader = avro_io.DatumReader(writer_schema, reader_schema)
+    self.assertRaises(avro_io.SchemaResolutionException, datum_reader.read, decoder)
+
+  def testDefaultValue(self):
+    writer_schema = LONG_RECORD_SCHEMA
+    datum_to_write = LONG_RECORD_DATUM
+
+    correct = 0
+    for field_type, default_json, default_datum in DEFAULT_VALUE_EXAMPLES:
+      reader_schema = schema.Parse("""\
+        {"type": "record", "name": "Test",
+         "fields": [{"name": "H", "type": %s, "default": %s}]}
+        """ % (field_type, default_json))
+      datum_to_read = {'H': default_datum}
+
+      writer, encoder, datum_writer = write_datum(datum_to_write, writer_schema)
+      datum_read = read_datum(writer, writer_schema, reader_schema)
+      logging.debug('Datum Read: %s', datum_read)
+      if datum_to_read == datum_read: correct += 1
+    self.assertEqual(correct, len(DEFAULT_VALUE_EXAMPLES))
+
+  def testNoDefaultValue(self):
+    writer_schema = LONG_RECORD_SCHEMA
+    datum_to_write = LONG_RECORD_DATUM
+
+    reader_schema = schema.Parse("""\
+      {"type": "record", "name": "Test",
+       "fields": [{"name": "H", "type": "int"}]}""")
+
+    writer, encoder, datum_writer = write_datum(datum_to_write, writer_schema)
+    reader = io.BytesIO(writer.getvalue())
+    decoder = avro_io.BinaryDecoder(reader)
+    datum_reader = avro_io.DatumReader(writer_schema, reader_schema)
+    self.assertRaises(avro_io.SchemaResolutionException, datum_reader.read, decoder)
+
+  def testProjection(self):
+    writer_schema = LONG_RECORD_SCHEMA
+    datum_to_write = LONG_RECORD_DATUM
+
+    reader_schema = schema.Parse("""\
+      {"type": "record", "name": "Test",
+       "fields": [{"name": "E", "type": "int"},
+                  {"name": "F", "type": "int"}]}""")
+    datum_to_read = {'E': 5, 'F': 6}
+
+    writer, encoder, datum_writer = write_datum(datum_to_write, writer_schema)
+    datum_read = read_datum(writer, writer_schema, reader_schema)
+    logging.debug('Datum Read: %s', datum_read)
+    self.assertEqual(datum_to_read, datum_read)
+
+  def testFieldOrder(self):
+    writer_schema = LONG_RECORD_SCHEMA
+    datum_to_write = LONG_RECORD_DATUM
+
+    reader_schema = schema.Parse("""\
+      {"type": "record", "name": "Test",
+       "fields": [{"name": "F", "type": "int"},
+                  {"name": "E", "type": "int"}]}""")
+    datum_to_read = {'E': 5, 'F': 6}
+
+    writer, encoder, datum_writer = write_datum(datum_to_write, writer_schema)
+    datum_read = read_datum(writer, writer_schema, reader_schema)
+    logging.debug('Datum Read: %s', datum_read)
+    self.assertEqual(datum_to_read, datum_read)
+
+  def testTypeException(self):
+    writer_schema = schema.Parse("""\
+      {"type": "record", "name": "Test",
+       "fields": [{"name": "F", "type": "int"},
+                  {"name": "E", "type": "int"}]}""")
+    datum_to_write = {'E': 5, 'F': 'Bad'}
+    self.assertRaises(
+        avro_io.AvroTypeException, write_datum, datum_to_write, writer_schema)
+
+
+if __name__ == '__main__':
+  raise Exception('Use run_tests.py')

Propchange: avro/trunk/lang/py3/avro/tests/test_io.py
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message