Return-Path: X-Original-To: apmail-avro-commits-archive@www.apache.org Delivered-To: apmail-avro-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4B41910AFF for ; Fri, 10 Jan 2014 19:12:41 +0000 (UTC) Received: (qmail 37801 invoked by uid 500); 10 Jan 2014 19:12:29 -0000 Delivered-To: apmail-avro-commits-archive@avro.apache.org Received: (qmail 37708 invoked by uid 500); 10 Jan 2014 19:12:28 -0000 Mailing-List: contact commits-help@avro.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@avro.apache.org Delivered-To: mailing list commits@avro.apache.org Received: (qmail 37664 invoked by uid 99); 10 Jan 2014 19:12:21 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jan 2014 19:12:21 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jan 2014 19:12:07 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 8B8DF2388A5B; Fri, 10 Jan 2014 19:11:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1557225 [2/3] - in /avro/trunk: ./ lang/py3/ lang/py3/avro/ lang/py3/avro/tests/ lang/py3/scripts/ share/test/schemas/ Date: Fri, 10 Jan 2014 19:11:43 -0000 To: commits@avro.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140110191144.8B8DF2388A5B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: avro/trunk/lang/py3/avro/protocol.py URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/protocol.py?rev=1557225&view=auto ============================================================================== --- avro/trunk/lang/py3/avro/protocol.py (added) +++ avro/trunk/lang/py3/avro/protocol.py Fri Jan 10 19:11:42 2014 @@ -0,0 +1,402 @@ +#!/usr/bin/env python3 +# -*- mode: python -*- +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Protocol implementation. +""" + + +import hashlib +import json +import logging + +from avro import schema + +ImmutableDict = schema.ImmutableDict + +# ------------------------------------------------------------------------------ +# Constants + +# Allowed top-level schemas in a protocol: +VALID_TYPE_SCHEMA_TYPES = frozenset(['enum', 'record', 'error', 'fixed']) + + +# ------------------------------------------------------------------------------ +# Exceptions + + +class ProtocolParseException(schema.AvroException): + """Error while parsing a JSON protocol descriptor.""" + pass + + +# ------------------------------------------------------------------------------ +# Base Classes + + +class Protocol(object): + """An application protocol.""" + + @staticmethod + def _ParseTypeDesc(type_desc, names): + type_schema = schema.SchemaFromJSONData(type_desc, names=names) + if type_schema.type not in VALID_TYPE_SCHEMA_TYPES: + raise ProtocolParseException( + 'Invalid type %r in protocol %r: ' + 'protocols can only declare types %s.' + % (type_schema, avro_name, ','.join(VALID_TYPE_SCHEMA_TYPES))) + return type_schema + + @staticmethod + def _ParseMessageDesc(name, message_desc, names): + """Parses a protocol message descriptor. + + Args: + name: Name of the message. + message_desc: Descriptor of the message. + names: Tracker of the named Avro schema. + Returns: + The parsed protocol message. + Raises: + ProtocolParseException: if the descriptor is invalid. + """ + request_desc = message_desc.get('request') + if request_desc is None: + raise ProtocolParseException( + 'Invalid message descriptor with no "request": %r.' % message_desc) + request_schema = Message._ParseRequestFromJSONDesc( + request_desc=request_desc, + names=names, + ) + + response_desc = message_desc.get('response') + if response_desc is None: + raise ProtocolParseException( + 'Invalid message descriptor with no "response": %r.' % message_desc) + response_schema = Message._ParseResponseFromJSONDesc( + response_desc=response_desc, + names=names, + ) + + # Errors are optional: + errors_desc = message_desc.get('errors', tuple()) + error_union_schema = Message._ParseErrorsFromJSONDesc( + errors_desc=errors_desc, + names=names, + ) + + return Message( + name=name, + request=request_schema, + response=response_schema, + errors=error_union_schema, + ) + + @staticmethod + def _ParseMessageDescMap(message_desc_map, names): + for name, message_desc in message_desc_map.items(): + yield Protocol._ParseMessageDesc( + name=name, + message_desc=message_desc, + names=names, + ) + + def __init__( + self, + name, + namespace=None, + types=tuple(), + messages=tuple(), + ): + """Initializes a new protocol object. + + Args: + name: Protocol name (absolute or relative). + namespace: Optional explicit namespace (if name is relative). + types: Collection of types in the protocol. + messages: Collection of messages in the protocol. + """ + self._avro_name = schema.Name(name=name, namespace=namespace) + self._fullname = self._avro_name.fullname + self._name = self._avro_name.simple_name + self._namespace = self._avro_name.namespace + + self._props = {} + self._props['name'] = self._name + if self._namespace: + self._props['namespace'] = self._namespace + + self._names = schema.Names(default_namespace=self._namespace) + + self._types = tuple(types) + # Map: type full name -> type schema + self._type_map = ( + ImmutableDict((type.fullname, type) for type in self._types)) + # This assertion cannot fail unless we don't track named schemas properly: + assert (len(self._types) == len(self._type_map)), ( + 'Type list %r does not match type map: %r' + % (self._types, self._type_map)) + # TODO: set props['types'] + + self._messages = tuple(messages) + + # Map: message name -> Message + # Note that message names are simple names unique within the protocol. + self._message_map = ImmutableDict( + items=((message.name, message) for message in self._messages)) + if len(self._messages) != len(self._message_map): + raise ProtocolParseException( + 'Invalid protocol %s with duplicate message name: %r' + % (self._avro_name, self._messages)) + # TODO: set props['messages'] + + self._md5 = hashlib.md5(str(self).encode('utf-8')).digest() + + @property + def name(self): + """Returns: the simple name of the protocol.""" + return self._name + + @property + def namespace(self): + """Returns: the namespace this protocol belongs to.""" + return self._namespace + + @property + def fullname(self): + """Returns: the fully qualified name of this protocol.""" + return self._fullname + + @property + def types(self): + """Returns: the collection of types declared in this protocol.""" + return self._types + + @property + def type_map(self): + """Returns: the map of types in this protocol, indexed by their full name.""" + return self._type_map + + @property + def messages(self): + """Returns: the collection of messages declared in this protocol.""" + return self._messages + + @property + def message_map(self): + """Returns: the map of messages in this protocol, indexed by their name.""" + return self._message_map + + @property + def md5(self): + return self._md5 + + @property + def props(self): + return self._props + + def to_json(self): + to_dump = {} + to_dump['protocol'] = self.name + names = schema.Names(default_namespace=self.namespace) + if self.namespace: + to_dump['namespace'] = self.namespace + if self.types: + to_dump['types'] = [ t.to_json(names) for t in self.types ] + if self.messages: + messages_dict = {} + for name, body in self.message_map.items(): + messages_dict[name] = body.to_json(names) + to_dump['messages'] = messages_dict + return to_dump + + def __str__(self): + return json.dumps(self.to_json()) + + def __eq__(self, that): + to_cmp = json.loads(str(self)) + return to_cmp == json.loads(str(that)) + + +# ------------------------------------------------------------------------------ + + +class Message(object): + """A Protocol message.""" + + @staticmethod + def _ParseRequestFromJSONDesc(request_desc, names): + """Parses the request descriptor of a protocol message. + + Args: + request_desc: Descriptor of the message request. + This is a list of fields that defines an unnamed record. + names: Tracker for named Avro schemas. + Returns: + The parsed request schema, as an unnamed record. + """ + fields = schema.RecordSchema._MakeFieldList(request_desc, names=names) + return schema.RecordSchema( + name=None, + namespace=None, + fields=fields, + names=names, + record_type=schema.REQUEST, + ) + + @staticmethod + def _ParseResponseFromJSONDesc(response_desc, names): + """Parses the response descriptor of a protocol message. + + Args: + response_desc: Descriptor of the message response. + This is an arbitrary Avro schema descriptor. + Returns: + The parsed response schema. + """ + return schema.SchemaFromJSONData(response_desc, names=names) + + @staticmethod + def _ParseErrorsFromJSONDesc(errors_desc, names): + """Parses the errors descriptor of a protocol message. + + Args: + errors_desc: Descriptor of the errors thrown by the protocol message. + This is a list of error types understood as an implicit union. + Each error type is an arbitrary Avro schema. + names: Tracker for named Avro schemas. + Returns: + The parsed ErrorUnionSchema. + """ + error_union_desc = { + 'type': schema.ERROR_UNION, + 'declared_errors': errors_desc, + } + return schema.SchemaFromJSONData(error_union_desc, names=names) + + def __init__(self, name, request, response, errors=None): + self._name = name + + self._props = {} + # TODO: set properties + self._request = request + self._response = response + self._errors = errors + + @property + def name(self): + return self._name + + @property + def request(self): + return self._request + + @property + def response(self): + return self._response + + @property + def errors(self): + return self._errors + + def props(self): + return self._props + + def __str__(self): + return json.dumps(self.to_json()) + + def to_json(self, names=None): + if names is None: + names = schema.Names() + to_dump = {} + to_dump['request'] = self.request.to_json(names) + to_dump['response'] = self.response.to_json(names) + if self.errors: + to_dump['errors'] = self.errors.to_json(names) + return to_dump + + def __eq__(self, that): + return self.name == that.name and self.props == that.props + + +# ------------------------------------------------------------------------------ + + +def ProtocolFromJSONData(json_data): + """Builds an Avro Protocol from its JSON descriptor. + + Args: + json_data: JSON data representing the descriptor of the Avro protocol. + Returns: + The Avro Protocol parsed from the JSON descriptor. + Raises: + ProtocolParseException: if the descriptor is invalid. + """ + if type(json_data) != dict: + raise ProtocolParseException( + 'Invalid JSON descriptor for an Avro protocol: %r' % json_data) + + name = json_data.get('protocol') + if name is None: + raise ProtocolParseException( + 'Invalid protocol descriptor with no "name": %r' % json_data) + + # Namespace is optional + namespace = json_data.get('namespace') + + avro_name = schema.Name(name=name, namespace=namespace) + names = schema.Names(default_namespace=avro_name.namespace) + + type_desc_list = json_data.get('types', tuple()) + types = tuple(map( + lambda desc: Protocol._ParseTypeDesc(desc, names=names), + type_desc_list)) + + message_desc_map = json_data.get('messages', dict()) + messages = tuple(Protocol._ParseMessageDescMap(message_desc_map, names=names)) + + return Protocol( + name=name, + namespace=namespace, + types=types, + messages=messages, + ) + + +def Parse(json_string): + """Constructs a Protocol from its JSON descriptor in text form. + + Args: + json_string: String representation of the JSON descriptor of the protocol. + Returns: + The parsed protocol. + Raises: + ProtocolParseException: on JSON parsing error, + or if the JSON descriptor is invalid. + """ + try: + json_data = json.loads(json_string) + except Exception as exn: + raise ProtocolParseException( + 'Error parsing protocol from JSON: %r. ' + 'Error message: %r.' + % (json_string, exn)) + + return ProtocolFromJSONData(json_data) + Propchange: avro/trunk/lang/py3/avro/protocol.py ------------------------------------------------------------------------------ svn:eol-style = native Added: avro/trunk/lang/py3/avro/schema.py URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/schema.py?rev=1557225&view=auto ============================================================================== --- avro/trunk/lang/py3/avro/schema.py (added) +++ avro/trunk/lang/py3/avro/schema.py Fri Jan 10 19:11:42 2014 @@ -0,0 +1,1283 @@ +#!/usr/bin/env python3 +# -*- mode: python -*- +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Representation of Avro schemas. + +A schema may be one of: + - A record, mapping field names to field value data; + - An error, equivalent to a record; + - An enum, containing one of a small set of symbols; + - An array of values, all of the same schema; + - A map containing string/value pairs, each of a declared schema; + - A union of other schemas; + - A fixed sized binary object; + - A unicode string; + - A sequence of bytes; + - A 32-bit signed int; + - A 64-bit signed long; + - A 32-bit floating-point float; + - A 64-bit floating-point double; + - A boolean; + - Null. +""" + + +import abc +import collections +import json +import logging +import re + + +# ------------------------------------------------------------------------------ +# Constants + +# Log level more verbose than DEBUG=10, INFO=20, etc. +DEBUG_VERBOSE=5 + + +NULL = 'null' +BOOLEAN = 'boolean' +STRING = 'string' +BYTES = 'bytes' +INT = 'int' +LONG = 'long' +FLOAT = 'float' +DOUBLE = 'double' +FIXED = 'fixed' +ENUM = 'enum' +RECORD = 'record' +ERROR = 'error' +ARRAY = 'array' +MAP = 'map' +UNION = 'union' + +# Request and error unions are part of Avro protocols: +REQUEST = 'request' +ERROR_UNION = 'error_union' + +PRIMITIVE_TYPES = frozenset([ + NULL, + BOOLEAN, + STRING, + BYTES, + INT, + LONG, + FLOAT, + DOUBLE, +]) + +NAMED_TYPES = frozenset([ + FIXED, + ENUM, + RECORD, + ERROR, +]) + +VALID_TYPES = frozenset.union( + PRIMITIVE_TYPES, + NAMED_TYPES, + [ + ARRAY, + MAP, + UNION, + REQUEST, + ERROR_UNION, + ], +) + +SCHEMA_RESERVED_PROPS = frozenset([ + 'type', + 'name', + 'namespace', + 'fields', # Record + 'items', # Array + 'size', # Fixed + 'symbols', # Enum + 'values', # Map + 'doc', +]) + +FIELD_RESERVED_PROPS = frozenset([ + 'default', + 'name', + 'doc', + 'order', + 'type', +]) + +VALID_FIELD_SORT_ORDERS = frozenset([ + 'ascending', + 'descending', + 'ignore', +]) + + +# ------------------------------------------------------------------------------ +# Exceptions + + +class Error(Exception): + """Base class for errors in this module.""" + pass + + +class AvroException(Error): + """Generic Avro schema error.""" + pass + + +class SchemaParseException(AvroException): + """Error while parsing a JSON schema descriptor.""" + pass + + +# ------------------------------------------------------------------------------ + + +class ImmutableDict(dict): + """Dictionary guaranteed immutable. + + All mutations raise an exception. + Behaves exactly as a dict otherwise. + """ + + def __init__(self, items=None, **kwargs): + if items is not None: + super(ImmutableDict, self).__init__(items) + assert (len(kwargs) == 0) + else: + super(ImmutableDict, self).__init__(**kwargs) + + def __setitem__(self, key, value): + raise Exception( + 'Attempting to map key %r to value %r in ImmutableDict %r' + % (key, value, self)) + + def __delitem__(self, key): + raise Exception( + 'Attempting to remove mapping for key %r in ImmutableDict %r' + % (key, self)) + + def clear(self): + raise Exception('Attempting to clear ImmutableDict %r' % self) + + def update(self, items=None, **kwargs): + raise Exception( + 'Attempting to update ImmutableDict %r with items=%r, kwargs=%r' + % (self, args, kwargs)) + + def pop(self, key, default=None): + raise Exception( + 'Attempting to pop key %r from ImmutableDict %r' % (key, self)) + + def popitem(self): + raise Exception('Attempting to pop item from ImmutableDict %r' % self) + + +# ------------------------------------------------------------------------------ + + +class Schema(object, metaclass=abc.ABCMeta): + """Abstract base class for all Schema classes.""" + + def __init__(self, type, other_props=None): + """Initializes a new schema object. + + Args: + type: Type of the schema to initialize. + other_props: Optional dictionary of additional properties. + """ + if type not in VALID_TYPES: + raise SchemaParseException('%r is not a valid Avro type.' % type) + + # All properties of this schema, as a map: property name -> property value + self._props = {} + + self._props['type'] = type + self._type = type + + if other_props: + self._props.update(other_props) + + @property + def name(self): + """Returns: the simple name of this schema.""" + return self._props['name'] + + @property + def fullname(self): + """Returns: the fully qualified name of this schema.""" + # By default, the full name is the simple name. + # Named schemas override this behavior to include the namespace. + return self.name + + @property + def namespace(self): + """Returns: the namespace this schema belongs to, if any, or None.""" + return self._props.get('namespace', None) + + @property + def type(self): + """Returns: the type of this schema.""" + return self._type + + @property + def doc(self): + """Returns: the documentation associated to this schema, if any, or None.""" + return self._props.get('doc', None) + + @property + def props(self): + """Reports all the properties of this schema. + + Includes all properties, reserved and non reserved. + JSON properties of this schema are directly generated from this dict. + + Returns: + A read-only dictionary of properties associated to this schema. + """ + return ImmutableDict(self._props) + + @property + def other_props(self): + """Returns: the dictionary of non-reserved properties.""" + return dict(FilterKeysOut(items=self._props, keys=SCHEMA_RESERVED_PROPS)) + + def __str__(self): + """Returns: the JSON representation of this schema.""" + return json.dumps(self.to_json()) + + @abc.abstractmethod + def to_json(self, names): + """Converts the schema object into its AVRO specification representation. + + Schema types that have names (records, enums, and fixed) must + be aware of not re-defining schemas that are already listed + in the parameter names. + """ + raise Exception('Cannot run abstract method.') + + +# ------------------------------------------------------------------------------ + + +_RE_NAME = re.compile(r'[A-Za-z_][A-Za-z0-9_]*') + +_RE_FULL_NAME = re.compile( + r'^' + r'[.]?(?:[A-Za-z_][A-Za-z0-9_]*[.])*' # optional namespace + r'([A-Za-z_][A-Za-z0-9_]*)' # name + r'$' +) + +class Name(object): + """Representation of an Avro name.""" + + def __init__(self, name, namespace=None): + """Parses an Avro name. + + Args: + name: Avro name to parse (relative or absolute). + namespace: Optional explicit namespace if the name is relative. + """ + # Normalize: namespace is always defined as a string, possibly empty. + if namespace is None: namespace = '' + + if '.' in name: + # name is absolute, namespace is ignored: + self._fullname = name + + match = _RE_FULL_NAME.match(self._fullname) + if match is None: + raise SchemaParseException( + 'Invalid absolute schema name: %r.' % self._fullname) + + self._name = match.group(1) + self._namespace = self._fullname[:-(len(self._name) + 1)] + + else: + # name is relative, combine with explicit namespace: + self._name = name + self._namespace = namespace + self._fullname = '%s.%s' % (self._namespace, self._name) + + # Validate the fullname: + if _RE_FULL_NAME.match(self._fullname) is None: + raise SchemaParseException( + 'Invalid schema name %r infered from name %r and namespace %r.' + % (self._fullname, self._name, self._namespace)) + + def __eq__(self, other): + if not isinstance(other, Name): + return False + return (self.fullname == other.fullname) + + @property + def simple_name(self): + """Returns: the simple name part of this name.""" + return self._name + + @property + def namespace(self): + """Returns: this name's namespace, possible the empty string.""" + return self._namespace + + @property + def fullname(self): + """Returns: the full name (always contains a period '.').""" + return self._fullname + + +# ------------------------------------------------------------------------------ + + +class Names(object): + """Tracks Avro named schemas and default namespace during parsing.""" + + def __init__(self, default_namespace=None, names=None): + """Initializes a new name tracker. + + Args: + default_namespace: Optional default namespace. + names: Optional initial mapping of known named schemas. + """ + if names is None: + names = {} + self._names = names + self._default_namespace = default_namespace + + @property + def names(self): + """Returns: the mapping of known named schemas.""" + return self._names + + @property + def default_namespace(self): + """Returns: the default namespace, if any, or None.""" + return self._default_namespace + + def NewWithDefaultNamespace(self, namespace): + """Creates a new name tracker from this tracker, but with a new default ns. + + Args: + namespace: New default namespace to use. + Returns: + New name tracker with the specified default namespace. + """ + return Names(names=self._names, default_namespace=namespace) + + def GetName(self, name, namespace=None): + """Resolves the Avro name according to this name tracker's state. + + Args: + name: Name to resolve (absolute or relative). + namespace: Optional explicit namespace. + Returns: + The specified name, resolved according to this tracker. + """ + if namespace is None: namespace = self._default_namespace + return Name(name=name, namespace=namespace) + + def has_name(self, name, namespace=None): + avro_name = self.GetName(name=name, namespace=namespace) + return avro_name.fullname in self._names + + def get_name(self, name, namespace=None): + avro_name = self.GetName(name=name, namespace=namespace) + return self._names.get(avro_name.fullname, None) + + def GetSchema(self, name, namespace=None): + """Resolves an Avro schema by name. + + Args: + name: Name (relative or absolute) of the Avro schema to look up. + namespace: Optional explicit namespace. + Returns: + The schema with the specified name, if any, or None. + """ + avro_name = self.GetName(name=name, namespace=namespace) + return self._names.get(avro_name.fullname, None) + + def prune_namespace(self, properties): + """given a properties, return properties with namespace removed if + it matches the own default namespace + """ + if self.default_namespace is None: + # I have no default -- no change + return properties + if 'namespace' not in properties: + # he has no namespace - no change + return properties + if properties['namespace'] != self.default_namespace: + # we're different - leave his stuff alone + return properties + # we each have a namespace and it's redundant. delete his. + prunable = properties.copy() + del(prunable['namespace']) + return prunable + + def Register(self, schema): + """Registers a new named schema in this tracker. + + Args: + schema: Named Avro schema to register in this tracker. + """ + if schema.fullname in VALID_TYPES: + raise SchemaParseException( + '%s is a reserved type name.' % schema.fullname) + if schema.fullname in self.names: + raise SchemaParseException( + 'Avro name %r already exists.' % schema.fullname) + + logging.log(DEBUG_VERBOSE, 'Register new name for %r', schema.fullname) + self._names[schema.fullname] = schema + + +# ------------------------------------------------------------------------------ + + +class NamedSchema(Schema): + """Abstract base class for named schemas. + + Named schemas are enumerated in NAMED_TYPES. + """ + + def __init__( + self, + type, + name, + namespace=None, + names=None, + other_props=None, + ): + """Initializes a new named schema object. + + Args: + type: Type of the named schema. + name: Name (absolute or relative) of the schema. + namespace: Optional explicit namespace if name is relative. + names: Tracker to resolve and register Avro names. + other_props: Optional map of additional properties of the schema. + """ + assert (type in NAMED_TYPES), ('Invalid named type: %r' % type) + self._avro_name = names.GetName(name=name, namespace=namespace) + + super(NamedSchema, self).__init__(type, other_props) + + names.Register(self) + + self._props['name'] = self.name + if self.namespace: + self._props['namespace'] = self.namespace + + @property + def avro_name(self): + """Returns: the Name object describing this schema's name.""" + return self._avro_name + + @property + def name(self): + return self._avro_name.simple_name + + @property + def namespace(self): + return self._avro_name.namespace + + @property + def fullname(self): + return self._avro_name.fullname + + def name_ref(self, names): + """Reports this schema name relative to the specified name tracker. + + Args: + names: Avro name tracker to relativise this schema name against. + Returns: + This schema name, relativised against the specified name tracker. + """ + if self.namespace == names.default_namespace: + return self.name + else: + return self.fullname + + +# ------------------------------------------------------------------------------ + + +_NO_DEFAULT = object() + + +class Field(object): + """Representation of the schema of a field in a record.""" + + def __init__( + self, + type, + name, + index, + has_default, + default=_NO_DEFAULT, + order=None, + names=None, + doc=None, + other_props=None + ): + """Initializes a new Field object. + + Args: + type: Avro schema of the field. + name: Name of the field. + index: 0-based position of the field. + has_default: + default: + order: + names: + doc: + other_props: + """ + if (not isinstance(name, str)) or (len(name) == 0): + raise SchemaParseException('Invalid record field name: %r.' % name) + if (order is not None) and (order not in VALID_FIELD_SORT_ORDERS): + raise SchemaParseException('Invalid record field order: %r.' % order) + + # All properties of this record field: + self._props = {} + + self._has_default = has_default + if other_props: + self._props.update(other_props) + + self._index = index + self._type = self._props['type'] = type + self._name = self._props['name'] = name + + # TODO: check to ensure default is valid + if has_default: + self._props['default'] = default + + if order is not None: + self._props['order'] = order + + if doc is not None: + self._props['doc'] = doc + + @property + def type(self): + """Returns: the schema of this field.""" + return self._type + + @property + def name(self): + """Returns: this field name.""" + return self._name + + @property + def index(self): + """Returns: the 0-based index of this field in the record.""" + return self._index + + @property + def default(self): + return self._props['default'] + + @property + def has_default(self): + return self._has_default + + @property + def order(self): + return self._props.get('order', None) + + @property + def doc(self): + return self._props.get('doc', None) + + @property + def props(self): + return self._props + + @property + def other_props(self): + return FilterKeysOut(items=self._props, keys=FIELD_RESERVED_PROPS) + + def __str__(self): + return json.dumps(self.to_json()) + + def to_json(self, names=None): + if names is None: + names = Names() + to_dump = self.props.copy() + to_dump['type'] = self.type.to_json(names) + return to_dump + + def __eq__(self, that): + to_cmp = json.loads(str(self)) + return to_cmp == json.loads(str(that)) + + +# ------------------------------------------------------------------------------ +# Primitive Types + + +class PrimitiveSchema(Schema): + """Schema of a primitive Avro type. + + Valid primitive types are defined in PRIMITIVE_TYPES. + """ + + def __init__(self, type): + """Initializes a new schema object for the specified primitive type. + + Args: + type: Type of the schema to construct. Must be primitive. + """ + if type not in PRIMITIVE_TYPES: + raise AvroException('%r is not a valid primitive type.' % type) + super(PrimitiveSchema, self).__init__(type) + + @property + def name(self): + """Returns: the simple name of this schema.""" + # The name of a primitive type is the type itself. + return self.type + + def to_json(self, names=None): + if len(self.props) == 1: + return self.fullname + else: + return self.props + + def __eq__(self, that): + return self.props == that.props + + +# ------------------------------------------------------------------------------ +# Complex Types (non-recursive) + + +class FixedSchema(NamedSchema): + def __init__( + self, + name, + namespace, + size, + names=None, + other_props=None, + ): + # Ensure valid ctor args + if not isinstance(size, int): + fail_msg = 'Fixed Schema requires a valid integer for size property.' + raise AvroException(fail_msg) + + super(FixedSchema, self).__init__( + type=FIXED, + name=name, + namespace=namespace, + names=names, + other_props=other_props, + ) + self._props['size'] = size + + @property + def size(self): + """Returns: the size of this fixed schema, in bytes.""" + return self._props['size'] + + def to_json(self, names=None): + if names is None: + names = Names() + if self.fullname in names.names: + return self.name_ref(names) + else: + names.names[self.fullname] = self + return names.prune_namespace(self.props) + + def __eq__(self, that): + return self.props == that.props + + +# ------------------------------------------------------------------------------ + + +class EnumSchema(NamedSchema): + def __init__( + self, + name, + namespace, + symbols, + names=None, + doc=None, + other_props=None, + ): + """Initializes a new enumeration schema object. + + Args: + name: Simple name of this enumeration. + namespace: Optional namespace. + symbols: Ordered list of symbols defined in this enumeration. + names: + doc: + other_props: + """ + symbols = tuple(symbols) + symbol_set = frozenset(symbols) + if (len(symbol_set) != len(symbols) + or not all(map(lambda symbol: isinstance(symbol, str), symbols))): + raise AvroException( + 'Invalid symbols for enum schema: %r.' % (symbols,)) + + super(EnumSchema, self).__init__( + type=ENUM, + name=name, + namespace=namespace, + names=names, + other_props=other_props, + ) + + self._props['symbols'] = tuple(sorted(symbol_set)) + if doc is not None: + self._props['doc'] = doc + + @property + def symbols(self): + """Returns: the symbols defined in this enum.""" + return self._props['symbols'] + + def to_json(self, names=None): + if names is None: + names = Names() + if self.fullname in names.names: + return self.name_ref(names) + else: + names.names[self.fullname] = self + return names.prune_namespace(self.props) + + def __eq__(self, that): + return self.props == that.props + + +# ------------------------------------------------------------------------------ +# Complex Types (recursive) + + +class ArraySchema(Schema): + """Schema of an array.""" + + def __init__(self, items, other_props=None): + """Initializes a new array schema object. + + Args: + items: Avro schema of the array items. + other_props: + """ + super(ArraySchema, self).__init__( + type=ARRAY, + other_props=other_props, + ) + self._items_schema = items + self._props['items'] = items + + @property + def items(self): + """Returns: the schema of the items in this array.""" + return self._items_schema + + def to_json(self, names=None): + if names is None: + names = Names() + to_dump = self.props.copy() + item_schema = self.items + to_dump['items'] = item_schema.to_json(names) + return to_dump + + def __eq__(self, that): + to_cmp = json.loads(str(self)) + return to_cmp == json.loads(str(that)) + + +# ------------------------------------------------------------------------------ + + +class MapSchema(Schema): + """Schema of a map.""" + + def __init__(self, values, other_props=None): + """Initializes a new map schema object. + + Args: + values: Avro schema of the map values. + other_props: + """ + super(MapSchema, self).__init__( + type=MAP, + other_props=other_props, + ) + self._values_schema = values + self._props['values'] = values + + @property + def values(self): + """Returns: the schema of the values in this map.""" + return self._values_schema + + def to_json(self, names=None): + if names is None: + names = Names() + to_dump = self.props.copy() + to_dump['values'] = self.values.to_json(names) + return to_dump + + def __eq__(self, that): + to_cmp = json.loads(str(self)) + return to_cmp == json.loads(str(that)) + + +# ------------------------------------------------------------------------------ + + +class UnionSchema(Schema): + """Schema of a union.""" + + def __init__(self, schemas): + """Initializes a new union schema object. + + Args: + schemas: Ordered collection of schema branches in the union. + """ + super(UnionSchema, self).__init__(type=UNION) + self._schemas = tuple(schemas) + + # Validate the schema branches: + + # All named schema names are unique: + named_branches = tuple( + filter(lambda schema: schema.type in NAMED_TYPES, self._schemas)) + unique_names = frozenset(map(lambda schema: schema.fullname, named_branches)) + if len(unique_names) != len(named_branches): + raise AvroException( + 'Invalid union branches with duplicate schema name:%s' + % ''.join(map(lambda schema: ('\n\t - %s' % schema), self._schemas))) + + # Types are unique within unnamed schemas, and union is not allowed: + unnamed_branches = tuple( + filter(lambda schema: schema.type not in NAMED_TYPES, self._schemas)) + unique_types = frozenset(map(lambda schema: schema.type, unnamed_branches)) + if UNION in unique_types: + raise AvroException( + 'Invalid union branches contain other unions:%s' + % ''.join(map(lambda schema: ('\n\t - %s' % schema), self._schemas))) + if len(unique_types) != len(unnamed_branches): + raise AvroException( + 'Invalid union branches with duplicate type:%s' + % ''.join(map(lambda schema: ('\n\t - %s' % schema), self._schemas))) + + @property + def schemas(self): + """Returns: the ordered list of schema branches in the union.""" + return self._schemas + + def to_json(self, names=None): + if names is None: + names = Names() + to_dump = [] + for schema in self.schemas: + to_dump.append(schema.to_json(names)) + return to_dump + + def __eq__(self, that): + to_cmp = json.loads(str(self)) + return to_cmp == json.loads(str(that)) + + +# ------------------------------------------------------------------------------ + + +class ErrorUnionSchema(UnionSchema): + """Schema representing the declared errors of a protocol message.""" + + def __init__(self, schemas): + """Initializes an error-union schema. + + Args: + schema: collection of error schema. + """ + # TODO: check that string isn't already listed explicitly as an error. + # Prepend "string" to handle system errors + schemas = [PrimitiveSchema(type=STRING)] + list(schemas) + super(ErrorUnionSchema, self).__init__(schemas=schemas) + + def to_json(self, names=None): + if names is None: + names = Names() + to_dump = [] + for schema in self.schemas: + # Don't print the system error schema + if schema.type == STRING: continue + to_dump.append(schema.to_json(names)) + return to_dump + + +# ------------------------------------------------------------------------------ + + +class RecordSchema(NamedSchema): + """Schema of a record.""" + + @staticmethod + def _MakeField(index, field_desc, names): + """Builds field schemas from a list of field JSON descriptors. + + Args: + index: 0-based index of the field in the record. + field_desc: JSON descriptors of a record field. + names: Avro schema tracker. + Return: + The field schema. + """ + field_schema = SchemaFromJSONData( + json_data=field_desc['type'], + names=names, + ) + other_props = ( + dict(FilterKeysOut(items=field_desc, keys=FIELD_RESERVED_PROPS))) + return Field( + type=field_schema, + name=field_desc['name'], + index=index, + has_default=('default' in field_desc), + default=field_desc.get('default', _NO_DEFAULT), + order=field_desc.get('order', None), + names=names, + doc=field_desc.get('doc', None), + other_props=other_props, + ) + + @staticmethod + def _MakeFieldList(field_desc_list, names): + """Builds field schemas from a list of field JSON descriptors. + + Guarantees field name unicity. + + Args: + field_desc_list: collection of field JSON descriptors. + names: Avro schema tracker. + Yields + Field schemas. + """ + for index, field_desc in enumerate(field_desc_list): + yield RecordSchema._MakeField(index, field_desc, names) + + @staticmethod + def _MakeFieldMap(fields): + """Builds the field map. + + Guarantees field name unicity. + + Args: + fields: iterable of field schema. + Returns: + A read-only map of field schemas, indexed by name. + """ + field_map = {} + for field in fields: + if field.name in field_map: + raise SchemaParseException( + 'Duplicate field name %r in list %r.' % (field.name, field_desc_list)) + field_map[field.name] = field + return ImmutableDict(field_map) + + def __init__( + self, + name, + namespace, + fields=None, + make_fields=None, + names=None, + record_type=RECORD, + doc=None, + other_props=None + ): + """Initializes a new record schema object. + + Args: + name: Name of the record (absolute or relative). + namespace: Optional namespace the record belongs to, if name is relative. + fields: collection of fields to add to this record. + Exactly one of fields or make_fields must be specified. + make_fields: function creating the fields that belong to the record. + The function signature is: make_fields(names) -> ordered field list. + Exactly one of fields or make_fields must be specified. + names: + record_type: Type of the record: one of RECORD, ERROR or REQUEST. + Protocol requests are not named. + doc: + other_props: + """ + if record_type == REQUEST: + # Protocol requests are not named: + super(NamedSchema, self).__init__( + type=REQUEST, + other_props=other_props, + ) + elif record_type in [RECORD, ERROR]: + # Register this record name in the tracker: + super(RecordSchema, self).__init__( + type=record_type, + name=name, + namespace=namespace, + names=names, + other_props=other_props, + ) + else: + raise SchemaParseException( + 'Invalid record type: %r.' % record_type) + + if record_type in [RECORD, ERROR]: + avro_name = names.GetName(name=name, namespace=namespace) + nested_names = names.NewWithDefaultNamespace(namespace=avro_name.namespace) + elif record_type == REQUEST: + # Protocol request has no name: no need to change default namespace: + nested_names = names + + if fields is None: + fields = make_fields(names=nested_names) + else: + assert (make_fields is None) + self._fields = tuple(fields) + + self._field_map = RecordSchema._MakeFieldMap(self._fields) + + self._props['fields'] = fields + if doc is not None: + self._props['doc'] = doc + + @property + def fields(self): + """Returns: the field schemas, as an ordered tuple.""" + return self._fields + + @property + def field_map(self): + """Returns: a read-only map of the field schemas index by field names.""" + return self._field_map + + def to_json(self, names=None): + if names is None: + names = Names() + # Request records don't have names + if self.type == REQUEST: + return [f.to_json(names) for f in self.fields] + + if self.fullname in names.names: + return self.name_ref(names) + else: + names.names[self.fullname] = self + + to_dump = names.prune_namespace(self.props.copy()) + to_dump['fields'] = [f.to_json(names) for f in self.fields] + return to_dump + + def __eq__(self, that): + to_cmp = json.loads(str(self)) + return to_cmp == json.loads(str(that)) + + +# ------------------------------------------------------------------------------ +# Module functions + + +def FilterKeysOut(items, keys): + """Filters a collection of (key, value) items. + + Exclude any item whose key belongs to keys. + + Args: + items: Dictionary of items to filter the keys out of. + keys: Keys to filter out. + Yields: + Filtered items. + """ + for key, value in items.items(): + if key in keys: continue + yield (key, value) + + +# ------------------------------------------------------------------------------ + + +def _SchemaFromJSONString(json_string, names): + if json_string in PRIMITIVE_TYPES: + return PrimitiveSchema(type=json_string) + else: + # Look for a known named schema: + schema = names.GetSchema(name=json_string) + if schema is None: + raise SchemaParseException( + 'Unknown named schema %r, known names: %r.' + % (json_string, sorted(names.names))) + return schema + + +def _SchemaFromJSONArray(json_array, names): + def MakeSchema(desc): + return SchemaFromJSONData(json_data=desc, names=names) + return UnionSchema(map(MakeSchema, json_array)) + + +def _SchemaFromJSONObject(json_object, names): + type = json_object.get('type') + if type is None: + raise SchemaParseException( + 'Avro schema JSON descriptor has no "type" property: %r' % json_object) + + other_props = dict( + FilterKeysOut(items=json_object, keys=SCHEMA_RESERVED_PROPS)) + + if type in PRIMITIVE_TYPES: + # FIXME should not ignore other properties + return PrimitiveSchema(type) + + elif type in NAMED_TYPES: + name = json_object.get('name') + namespace = json_object.get('namespace', names.default_namespace) + if type == FIXED: + size = json_object.get('size') + return FixedSchema(name, namespace, size, names, other_props) + elif type == ENUM: + symbols = json_object.get('symbols') + doc = json_object.get('doc') + return EnumSchema(name, namespace, symbols, names, doc, other_props) + + elif type in [RECORD, ERROR]: + field_desc_list = json_object.get('fields', ()) + + def MakeFields(names): + return tuple(RecordSchema._MakeFieldList(field_desc_list, names)) + + return RecordSchema( + name=name, + namespace=namespace, + make_fields=MakeFields, + names=names, + record_type=type, + doc=json_object.get('doc'), + other_props=other_props, + ) + else: + raise Exception('Internal error: unknown type %r.' % type) + + elif type in VALID_TYPES: + # Unnamed, non-primitive Avro type: + + if type == ARRAY: + items_desc = json_object.get('items') + if items_desc is None: + raise SchemaParseException( + 'Invalid array schema descriptor with no "items" : %r.' + % json_object) + return ArraySchema( + items=SchemaFromJSONData(items_desc, names), + other_props=other_props, + ) + + elif type == MAP: + values_desc = json_object.get('values') + if values_desc is None: + raise SchemaParseException( + 'Invalid map schema descriptor with no "values" : %r.' + % json_object) + return MapSchema( + values=SchemaFromJSONData(values_desc, names=names), + other_props=other_props, + ) + + elif type == ERROR_UNION: + error_desc_list = json_object.get('declared_errors') + assert (error_desc_list is not None) + error_schemas = map( + lambda desc: SchemaFromJSONData(desc, names=names), + error_desc_list) + return ErrorUnionSchema(schemas=error_schemas) + + else: + raise Exception('Internal error: unknown type %r.' % type) + + raise SchemaParseException( + 'Invalid JSON descriptor for an Avro schema: %r' % json_object) + + +# Parsers for the JSON data types: +_JSONDataParserTypeMap = { + str: _SchemaFromJSONString, + list: _SchemaFromJSONArray, + dict: _SchemaFromJSONObject, +} + + +def SchemaFromJSONData(json_data, names=None): + """Builds an Avro Schema from its JSON descriptor. + + Args: + json_data: JSON data representing the descriptor of the Avro schema. + names: Optional tracker for Avro named schemas. + Returns: + The Avro schema parsed from the JSON descriptor. + Raises: + SchemaParseException: if the descriptor is invalid. + """ + if names is None: + names = Names() + + # Select the appropriate parser based on the JSON data type: + parser = _JSONDataParserTypeMap.get(type(json_data)) + if parser is None: + raise SchemaParseException( + 'Invalid JSON descriptor for an Avro schema: %r.' % json_data) + return parser(json_data, names=names) + + +# ------------------------------------------------------------------------------ + + +def Parse(json_string): + """Constructs a Schema from its JSON descriptor in text form. + + Args: + json_string: String representation of the JSON descriptor of the schema. + Returns: + The parsed schema. + Raises: + SchemaParseException: on JSON parsing error, + or if the JSON descriptor is invalid. + """ + try: + json_data = json.loads(json_string) + except Exception as exn: + raise SchemaParseException( + 'Error parsing schema from JSON: %r. ' + 'Error message: %r.' + % (json_string, exn)) + + # Initialize the names object + names = Names() + + # construct the Avro Schema object + return SchemaFromJSONData(json_data, names) Propchange: avro/trunk/lang/py3/avro/schema.py ------------------------------------------------------------------------------ svn:eol-style = native Added: avro/trunk/lang/py3/avro/tests/av_bench.py URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/av_bench.py?rev=1557225&view=auto ============================================================================== --- avro/trunk/lang/py3/avro/tests/av_bench.py (added) +++ avro/trunk/lang/py3/avro/tests/av_bench.py Fri Jan 10 19:11:42 2014 @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 +# -*- mode: python -*- +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import random +import string +import sys +import time + +import avro.datafile +import avro.io +import avro.schema + + +TYPES = ('A', 'CNAME',) +FILENAME = 'datafile.avr' + + +def GenerateRandomName(): + return ''.join(random.sample(string.ascii_lowercase, 15)) + + +def GenerateRandomIP(): + return '%s.%s.%s.%s' % ( + random.randint(0, 255), + random.randint(0, 255), + random.randint(0, 255), + random.randint(0, 255), + ) + + +def Write(nrecords): + """Writes a data file with the specified number of random records. + + Args: + nrecords: Number of records to write. + """ + schema_s = """ + { + "type": "record", + "name": "Query", + "fields" : [ + {"name": "query", "type": "string"}, + {"name": "response", "type": "string"}, + {"name": "type", "type": "string", "default": "A"} + ] + } + """ + schema = avro.schema.Parse(schema_s) + writer = avro.io.DatumWriter(schema) + + with open(FILENAME, 'wb') as out: + with avro.datafile.DataFileWriter( + out, writer, schema, + # codec='deflate' + ) as data_writer: + for _ in range(nrecords): + response = GenerateRandomIP() + query = GenerateRandomName() + type = random.choice(TYPES) + data_writer.append({ + 'query': query, + 'response': response, + 'type': type, + }) + + +def Read(expect_nrecords): + """Reads the data file generated by Write().""" + with open(FILENAME, 'rb') as f: + reader = avro.io.DatumReader() + with avro.datafile.DataFileReader(f, reader) as file_reader: + nrecords = 0 + for record in file_reader: + nrecords += 1 + assert (nrecords == expect_nrecords), ( + 'Expecting %d records, got %d.' % (expected_nrecords, nrecords)) + + +def Timing(f, *args): + s = time.time() + f(*args) + e = time.time() + return e - s + + +def Main(args): + nrecords = int(args[1]) + print('Write %0.4f' % Timing(Write, nrecords)) + print('Read %0.4f' % Timing(Read, nrecords)) + + +if __name__ == '__main__': + log_formatter = logging.Formatter( + '%(asctime)s %(levelname)s %(filename)s:%(lineno)s : %(message)s') + logging.root.setLevel(logging.DEBUG) + console_handler = logging.StreamHandler() + console_handler.setFormatter(log_formatter) + console_handler.setLevel(logging.DEBUG) + logging.root.addHandler(console_handler) + + Main(sys.argv) Propchange: avro/trunk/lang/py3/avro/tests/av_bench.py ------------------------------------------------------------------------------ svn:eol-style = native Added: avro/trunk/lang/py3/avro/tests/gen_interop_data.py URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/gen_interop_data.py?rev=1557225&view=auto ============================================================================== --- avro/trunk/lang/py3/avro/tests/gen_interop_data.py (added) +++ avro/trunk/lang/py3/avro/tests/gen_interop_data.py Fri Jan 10 19:11:42 2014 @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +# -*- mode: python -*- +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from avro import datafile +from avro import io +from avro import schema + + +DATUM = { + 'intField': 12, + 'longField': 15234324, + 'stringField': 'hey', + 'boolField': True, + 'floatField': 1234.0, + 'doubleField': -1234.0, + 'bytesField': '12312adf', + 'nullField': None, + 'arrayField': [5.0, 0.0, 12.0], + 'mapField': {'a': {'label': 'a'}, 'bee': {'label': 'cee'}}, + 'unionField': 12.0, + 'enumField': 'C', + 'fixedField': b'1019181716151413', + 'recordField': { + 'label': 'blah', + 'children': [{'label': 'inner', 'children': []}], + }, +} + + +if __name__ == "__main__": + interop_schema = schema.Parse(open(sys.argv[1], 'r').read()) + writer = open(sys.argv[2], 'wb') + datum_writer = io.DatumWriter() + # NB: not using compression + dfw = datafile.DataFileWriter(writer, datum_writer, interop_schema) + dfw.append(DATUM) + dfw.close() Propchange: avro/trunk/lang/py3/avro/tests/gen_interop_data.py ------------------------------------------------------------------------------ svn:eol-style = native Added: avro/trunk/lang/py3/avro/tests/run_tests.py URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/run_tests.py?rev=1557225&view=auto ============================================================================== --- avro/trunk/lang/py3/avro/tests/run_tests.py (added) +++ avro/trunk/lang/py3/avro/tests/run_tests.py Fri Jan 10 19:11:42 2014 @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 +# -*- mode: python -*- +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Runs all tests. + +Usage: + +- Run tests from all modules: + ./run_tests.py discover [-v] + +- Run tests in a specific module: + ./run_tests.py test_schema [-v] + +- Run a specific test: + ./run_tests.py test_schema.TestSchema.testParse [-v] + +- Set logging level: + PYTHON_LOG_LEVEL= ./run_tests.py ... + log-level 0 includes all logging. + log-level 10 includes debug logging. + log-level 20 includes info logging. + +- Command-line help: + ./run_tests.py -h + ./run_tests.py discover -h +""" + +import logging +import os +import sys +import unittest + +from avro.tests.test_datafile import * +from avro.tests.test_datafile_interop import * +from avro.tests.test_io import * +from avro.tests.test_ipc import * +from avro.tests.test_protocol import * +from avro.tests.test_schema import * +from avro.tests.test_script import * + + +def SetupLogging(): + log_level = int(os.environ.get('PYTHON_LOG_LEVEL', logging.INFO)) + + log_formatter = logging.Formatter( + '%(asctime)s %(levelname)s %(filename)s:%(lineno)s : %(message)s') + logging.root.handlers = list() # list.clear() only exists in python 3.3+ + logging.root.setLevel(log_level) + console_handler = logging.StreamHandler() + console_handler.setFormatter(log_formatter) + console_handler.setLevel(logging.DEBUG) + logging.root.addHandler(console_handler) + + +SetupLogging() + + +if __name__ == '__main__': + unittest.main() Propchange: avro/trunk/lang/py3/avro/tests/run_tests.py ------------------------------------------------------------------------------ svn:eol-style = native Added: avro/trunk/lang/py3/avro/tests/sample_http_client.py URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/sample_http_client.py?rev=1557225&view=auto ============================================================================== --- avro/trunk/lang/py3/avro/tests/sample_http_client.py (added) +++ avro/trunk/lang/py3/avro/tests/sample_http_client.py Fri Jan 10 19:11:42 2014 @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +# -*- mode: python -*- +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import sys + +from avro import ipc +from avro import protocol + +MAIL_PROTOCOL_JSON = """\ +{"namespace": "example.proto", + "protocol": "Mail", + + "types": [ + {"name": "Message", "type": "record", + "fields": [ + {"name": "to", "type": "string"}, + {"name": "from", "type": "string"}, + {"name": "body", "type": "string"} + ] + } + ], + + "messages": { + "send": { + "request": [{"name": "message", "type": "Message"}], + "response": "string" + }, + "replay": { + "request": [], + "response": "string" + } + } +} +""" +MAIL_PROTOCOL = protocol.Parse(MAIL_PROTOCOL_JSON) +SERVER_HOST = 'localhost' +SERVER_PORT = 9090 + +class UsageError(Exception): + def __init__(self, value): + self.value = value + def __str__(self): + return repr(self.value) + +def make_requestor(server_host, server_port, protocol): + client = ipc.HTTPTransceiver(SERVER_HOST, SERVER_PORT) + return ipc.Requestor(protocol, client) + +if __name__ == '__main__': + if len(sys.argv) not in [4, 5]: + raise UsageError("Usage: []") + + # client code - attach to the server and send a message + # fill in the Message record + message = dict() + message['to'] = sys.argv[1] + message['from'] = sys.argv[2] + message['body'] = sys.argv[3] + + try: + num_messages = int(sys.argv[4]) + except: + num_messages = 1 + + # build the parameters for the request + params = {} + params['message'] = message + + # send the requests and print the result + for msg_count in range(num_messages): + requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL) + result = requestor.request('send', params) + print("Result: " + result) + + # try out a replay message + requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL) + result = requestor.request('replay', dict()) + print("Replay Result: " + result) Propchange: avro/trunk/lang/py3/avro/tests/sample_http_client.py ------------------------------------------------------------------------------ svn:eol-style = native Added: avro/trunk/lang/py3/avro/tests/sample_http_server.py URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/sample_http_server.py?rev=1557225&view=auto ============================================================================== --- avro/trunk/lang/py3/avro/tests/sample_http_server.py (added) +++ avro/trunk/lang/py3/avro/tests/sample_http_server.py Fri Jan 10 19:11:42 2014 @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 +# -*- mode: python -*- +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer +from avro import ipc +from avro import protocol + +MAIL_PROTOCOL_JSON = """\ +{"namespace": "example.proto", + "protocol": "Mail", + + "types": [ + {"name": "Message", "type": "record", + "fields": [ + {"name": "to", "type": "string"}, + {"name": "from", "type": "string"}, + {"name": "body", "type": "string"} + ] + } + ], + + "messages": { + "send": { + "request": [{"name": "message", "type": "Message"}], + "response": "string" + }, + "replay": { + "request": [], + "response": "string" + } + } +} +""" +MAIL_PROTOCOL = protocol.Parse(MAIL_PROTOCOL_JSON) +SERVER_ADDRESS = ('localhost', 9090) + +class MailResponder(ipc.Responder): + def __init__(self): + ipc.Responder.__init__(self, MAIL_PROTOCOL) + + def invoke(self, message, request): + if message.name == 'send': + request_content = request['message'] + response = "Sent message to %(to)s from %(from)s with body %(body)s" % \ + request_content + return response + elif message.name == 'replay': + return 'replay' + +class MailHandler(BaseHTTPRequestHandler): + def do_POST(self): + self.responder = MailResponder() + call_request_reader = ipc.FramedReader(self.rfile) + call_request = call_request_reader.read_framed_message() + resp_body = self.responder.respond(call_request) + self.send_response(200) + self.send_header('Content-Type', 'avro/binary') + self.end_headers() + resp_writer = ipc.FramedWriter(self.wfile) + resp_writer.write_framed_message(resp_body) + +if __name__ == '__main__': + mail_server = HTTPServer(SERVER_ADDRESS, MailHandler) + mail_server.allow_reuse_address = True + mail_server.serve_forever() Propchange: avro/trunk/lang/py3/avro/tests/sample_http_server.py ------------------------------------------------------------------------------ svn:eol-style = native Added: avro/trunk/lang/py3/avro/tests/test_datafile.py URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/test_datafile.py?rev=1557225&view=auto ============================================================================== --- avro/trunk/lang/py3/avro/tests/test_datafile.py (added) +++ avro/trunk/lang/py3/avro/tests/test_datafile.py Fri Jan 10 19:11:42 2014 @@ -0,0 +1,278 @@ +#!/usr/bin/env python3 +# -*- mode: python -*- +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import tempfile +import unittest + +from avro import datafile +from avro import io +from avro import schema + + +# ------------------------------------------------------------------------------ + + +SCHEMAS_TO_VALIDATE = ( + ('"null"', None), + ('"boolean"', True), + ('"string"', 'adsfasdf09809dsf-=adsf'), + ('"bytes"', b'12345abcd'), + ('"int"', 1234), + ('"long"', 1234), + ('"float"', 1234.0), + ('"double"', 1234.0), + ('{"type": "fixed", "name": "Test", "size": 1}', b'B'), + ('{"type": "enum", "name": "Test", "symbols": ["A", "B"]}', 'B'), + ('{"type": "array", "items": "long"}', [1, 3, 2]), + ('{"type": "map", "values": "long"}', {'a': 1, 'b': 3, 'c': 2}), + ('["string", "null", "long"]', None), + + (""" + { + "type": "record", + "name": "Test", + "fields": [{"name": "f", "type": "long"}] + } + """, + {'f': 5}), + + (""" + { + "type": "record", + "name": "Lisp", + "fields": [{ + "name": "value", + "type": [ + "null", + "string", + { + "type": "record", + "name": "Cons", + "fields": [{"name": "car", "type": "Lisp"}, + {"name": "cdr", "type": "Lisp"}] + } + ] + }] + } + """, + {'value': {'car': {'value': 'head'}, 'cdr': {'value': None}}}), +) + +CODECS_TO_VALIDATE = ('null', 'deflate') + +try: + import snappy + CODECS_TO_VALIDATE += ('snappy',) +except ImportError: + logging.info('Snappy not present, will skip testing it.') + + +# ------------------------------------------------------------------------------ + + +class TestDataFile(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls._temp_dir = ( + tempfile.TemporaryDirectory(prefix=cls.__name__, suffix='.tmp')) + logging.debug('Created temporary directory: %s', cls._temp_dir.name) + + @classmethod + def tearDownClass(cls): + logging.debug('Cleaning up temporary directory: %s', cls._temp_dir.name) + cls._temp_dir.cleanup() + + def NewTempFile(self): + """Creates a new temporary file. + + File is automatically cleaned up after test. + + Returns: + The path of the new temporary file. + """ + temp_file = tempfile.NamedTemporaryFile( + dir=self._temp_dir.name, + prefix='test', + suffix='.avro', + delete=False, + ) + return temp_file.name + + def testRoundTrip(self): + correct = 0 + for iexample, (writer_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE): + for codec in CODECS_TO_VALIDATE: + file_path = self.NewTempFile() + + # Write the datum this many times in the data file: + nitems = 10 + + logging.debug( + 'Performing round-trip with codec %r in file %s for example #%d\n' + 'Writing datum: %r using writer schema:\n%s', + codec, file_path, iexample, + datum, writer_schema) + + logging.debug('Creating data file %r', file_path) + with open(file_path, 'wb') as writer: + datum_writer = io.DatumWriter() + schema_object = schema.Parse(writer_schema) + with datafile.DataFileWriter( + writer=writer, + datum_writer=datum_writer, + writer_schema=schema_object, + codec=codec, + ) as dfw: + for _ in range(nitems): + dfw.append(datum) + + logging.debug('Reading data from %r', file_path) + with open(file_path, 'rb') as reader: + datum_reader = io.DatumReader() + with datafile.DataFileReader(reader, datum_reader) as dfr: + round_trip_data = list(dfr) + + logging.debug( + 'Round-trip data has %d items: %r', + len(round_trip_data), round_trip_data) + + if ([datum] * nitems) == round_trip_data: + correct += 1 + else: + logging.error( + 'Round-trip data does not match:\n' + 'Expect: %r\n' + 'Actual: %r', + [datum] * nitems, + round_trip_data) + + self.assertEqual( + correct, + len(CODECS_TO_VALIDATE) * len(SCHEMAS_TO_VALIDATE)) + + def testAppend(self): + correct = 0 + for iexample, (writer_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE): + for codec in CODECS_TO_VALIDATE: + file_path = self.NewTempFile() + + logging.debug( + 'Performing append with codec %r in file %s for example #%d\n' + 'Writing datum: %r using writer schema:\n%s', + codec, file_path, iexample, + datum, writer_schema) + + logging.debug('Creating data file %r', file_path) + with open(file_path, 'wb') as writer: + datum_writer = io.DatumWriter() + schema_object = schema.Parse(writer_schema) + with datafile.DataFileWriter( + writer=writer, + datum_writer=datum_writer, + writer_schema=schema_object, + codec=codec, + ) as dfw: + dfw.append(datum) + + logging.debug('Appending data to %r', file_path) + for i in range(9): + with open(file_path, 'ab+') as writer: + with datafile.DataFileWriter(writer, io.DatumWriter()) as dfw: + dfw.append(datum) + + logging.debug('Reading appended data from %r', file_path) + with open(file_path, 'rb') as reader: + datum_reader = io.DatumReader() + with datafile.DataFileReader(reader, datum_reader) as dfr: + appended_data = list(dfr) + + logging.debug( + 'Appended data has %d items: %r', + len(appended_data), appended_data) + + if ([datum] * 10) == appended_data: + correct += 1 + else: + logging.error( + 'Appended data does not match:\n' + 'Expect: %r\n' + 'Actual: %r', + [datum] * 10, + appended_data) + + self.assertEqual( + correct, + len(CODECS_TO_VALIDATE) * len(SCHEMAS_TO_VALIDATE)) + + def testContextManager(self): + file_path = self.NewTempFile() + + # Test the writer with a 'with' statement. + with open(file_path, 'wb') as writer: + datum_writer = io.DatumWriter() + sample_schema, sample_datum = SCHEMAS_TO_VALIDATE[1] + schema_object = schema.Parse(sample_schema) + with datafile.DataFileWriter(writer, datum_writer, schema_object) as dfw: + dfw.append(sample_datum) + self.assertTrue(writer.closed) + + # Test the reader with a 'with' statement. + datums = [] + with open(file_path, 'rb') as reader: + datum_reader = io.DatumReader() + with datafile.DataFileReader(reader, datum_reader) as dfr: + for datum in dfr: + datums.append(datum) + self.assertTrue(reader.closed) + + def testMetadata(self): + file_path = self.NewTempFile() + + # Test the writer with a 'with' statement. + with open(file_path, 'wb') as writer: + datum_writer = io.DatumWriter() + sample_schema, sample_datum = SCHEMAS_TO_VALIDATE[1] + schema_object = schema.Parse(sample_schema) + with datafile.DataFileWriter(writer, datum_writer, schema_object) as dfw: + dfw.SetMeta('test.string', 'foo') + dfw.SetMeta('test.number', '1') + dfw.append(sample_datum) + self.assertTrue(writer.closed) + + # Test the reader with a 'with' statement. + datums = [] + with open(file_path, 'rb') as reader: + datum_reader = io.DatumReader() + with datafile.DataFileReader(reader, datum_reader) as dfr: + self.assertEqual(b'foo', dfr.GetMeta('test.string')) + self.assertEqual(b'1', dfr.GetMeta('test.number')) + for datum in dfr: + datums.append(datum) + self.assertTrue(reader.closed) + + +# ------------------------------------------------------------------------------ + + +if __name__ == '__main__': + raise Exception('Use run_tests.py') Propchange: avro/trunk/lang/py3/avro/tests/test_datafile.py ------------------------------------------------------------------------------ svn:eol-style = native Added: avro/trunk/lang/py3/avro/tests/test_datafile_interop.py URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/test_datafile_interop.py?rev=1557225&view=auto ============================================================================== --- avro/trunk/lang/py3/avro/tests/test_datafile_interop.py (added) +++ avro/trunk/lang/py3/avro/tests/test_datafile_interop.py Fri Jan 10 19:11:42 2014 @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +# -*- mode: python -*- +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import tempfile +import unittest + +from avro import datafile +from avro import io +from avro import schema + + +def GetInteropSchema(): + test_dir = os.path.dirname(os.path.abspath(__file__)) + schema_json_path = os.path.join(test_dir, 'interop.avsc') + with open(schema_json_path, 'r') as f: + schema_json = f.read() + return schema.Parse(schema_json) + + +INTEROP_SCHEMA = GetInteropSchema() +INTEROP_DATUM = { + 'intField': 12, + 'longField': 15234324, + 'stringField': 'hey', + 'boolField': True, + 'floatField': 1234.0, + 'doubleField': -1234.0, + 'bytesField': b'12312adf', + 'nullField': None, + 'arrayField': [5.0, 0.0, 12.0], + 'mapField': {'a': {'label': 'a'}, 'bee': {'label': 'cee'}}, + 'unionField': 12.0, + 'enumField': 'C', + 'fixedField': b'1019181716151413', + 'recordField': { + 'label': 'blah', + 'children': [{'label': 'inner', 'children': []}], + }, +} + + +def WriteDataFile(path, datum, schema): + datum_writer = io.DatumWriter() + with open(path, 'wb') as writer: + # NB: not using compression + with datafile.DataFileWriter(writer, datum_writer, schema) as dfw: + dfw.append(datum) + + +class TestDataFileInterop(unittest.TestCase): + def testInterop(self): + with tempfile.NamedTemporaryFile() as temp_path: + WriteDataFile(temp_path.name, INTEROP_DATUM, INTEROP_SCHEMA) + + # read data in binary from file + datum_reader = io.DatumReader() + with open(temp_path.name, 'rb') as reader: + dfr = datafile.DataFileReader(reader, datum_reader) + for datum in dfr: + self.assertEqual(INTEROP_DATUM, datum) + + +if __name__ == '__main__': + raise Exception('Use run_tests.py') Propchange: avro/trunk/lang/py3/avro/tests/test_datafile_interop.py ------------------------------------------------------------------------------ svn:eol-style = native Added: avro/trunk/lang/py3/avro/tests/test_io.py URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/test_io.py?rev=1557225&view=auto ============================================================================== --- avro/trunk/lang/py3/avro/tests/test_io.py (added) +++ avro/trunk/lang/py3/avro/tests/test_io.py Fri Jan 10 19:11:42 2014 @@ -0,0 +1,351 @@ +#!/usr/bin/env python3 +# -*- mode: python -*- +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import binascii +import io +import logging +import sys +import unittest + +from avro import io as avro_io +from avro import schema + + +SCHEMAS_TO_VALIDATE = ( + ('"null"', None), + ('"boolean"', True), + ('"string"', 'adsfasdf09809dsf-=adsf'), + ('"bytes"', b'12345abcd'), + ('"int"', 1234), + ('"long"', 1234), + ('"float"', 1234.0), + ('"double"', 1234.0), + ('{"type": "fixed", "name": "Test", "size": 1}', b'B'), + ('{"type": "enum", "name": "Test", "symbols": ["A", "B"]}', 'B'), + ('{"type": "array", "items": "long"}', [1, 3, 2]), + ('{"type": "map", "values": "long"}', {'a': 1, 'b': 3, 'c': 2}), + ('["string", "null", "long"]', None), + ("""\ + {"type": "record", + "name": "Test", + "fields": [{"name": "f", "type": "long"}]} + """, {'f': 5}), + (""" + { + "type": "record", + "name": "Lisp", + "fields": [{ + "name": "value", + "type": [ + "null", + "string", + { + "type": "record", + "name": "Cons", + "fields": [{"name": "car", "type": "Lisp"}, + {"name": "cdr", "type": "Lisp"}] + } + ] + }] + } + """, {'value': {'car': {'value': 'head'}, 'cdr': {'value': None}}}), +) + +BINARY_ENCODINGS = ( + (0, '00'), + (-1, '01'), + (1, '02'), + (-2, '03'), + (2, '04'), + (-64, '7f'), + (64, '80 01'), + (8192, '80 80 01'), + (-8193, '81 80 01'), +) + +DEFAULT_VALUE_EXAMPLES = ( + ('"null"', 'null', None), + ('"boolean"', 'true', True), + ('"string"', '"foo"', 'foo'), + ('"bytes"', '"\u00FF\u00FF"', '\xff\xff'), + ('"int"', '5', 5), + ('"long"', '5', 5), + ('"float"', '1.1', 1.1), + ('"double"', '1.1', 1.1), + ('{"type": "fixed", "name": "F", "size": 2}', '"\u00FF\u00FF"', '\xff\xff'), + ('{"type": "enum", "name": "F", "symbols": ["FOO", "BAR"]}', '"FOO"', 'FOO'), + ('{"type": "array", "items": "int"}', '[1, 2, 3]', [1, 2, 3]), + ('{"type": "map", "values": "int"}', '{"a": 1, "b": 2}', {'a': 1, 'b': 2}), + ('["int", "null"]', '5', 5), + ('{"type": "record", "name": "F", "fields": [{"name": "A", "type": "int"}]}', + '{"A": 5}', {'A': 5}), +) + +LONG_RECORD_SCHEMA = schema.Parse(""" +{ + "type": "record", + "name": "Test", + "fields": [ + {"name": "A", "type": "int"}, + {"name": "B", "type": "int"}, + {"name": "C", "type": "int"}, + {"name": "D", "type": "int"}, + {"name": "E", "type": "int"}, + {"name": "F", "type": "int"}, + {"name": "G", "type": "int"} + ] +} +""") + +LONG_RECORD_DATUM = {'A': 1, 'B': 2, 'C': 3, 'D': 4, 'E': 5, 'F': 6, 'G': 7} + + +def avro_hexlify(reader): + """Return the hex value, as a string, of a binary-encoded int or long.""" + bytes = [] + current_byte = reader.read(1) + bytes.append(binascii.hexlify(current_byte).decode()) + while (ord(current_byte) & 0x80) != 0: + current_byte = reader.read(1) + bytes.append(binascii.hexlify(current_byte).decode()) + return ' '.join(bytes) + + +def write_datum(datum, writer_schema): + writer = io.BytesIO() + encoder = avro_io.BinaryEncoder(writer) + datum_writer = avro_io.DatumWriter(writer_schema) + datum_writer.write(datum, encoder) + return writer, encoder, datum_writer + + +def read_datum(buffer, writer_schema, reader_schema=None): + reader = io.BytesIO(buffer.getvalue()) + decoder = avro_io.BinaryDecoder(reader) + datum_reader = avro_io.DatumReader(writer_schema, reader_schema) + return datum_reader.read(decoder) + + +def check_binary_encoding(number_type): + logging.debug('Testing binary encoding for type %s', number_type) + correct = 0 + for datum, hex_encoding in BINARY_ENCODINGS: + logging.debug('Datum: %d', datum) + logging.debug('Correct Encoding: %s', hex_encoding) + + writer_schema = schema.Parse('"%s"' % number_type.lower()) + writer, encoder, datum_writer = write_datum(datum, writer_schema) + writer.seek(0) + hex_val = avro_hexlify(writer) + + logging.debug('Read Encoding: %s', hex_val) + if hex_encoding == hex_val: correct += 1 + return correct + + +def check_skip_number(number_type): + logging.debug('Testing skip number for %s', number_type) + correct = 0 + for value_to_skip, hex_encoding in BINARY_ENCODINGS: + VALUE_TO_READ = 6253 + logging.debug('Value to Skip: %d', value_to_skip) + + # write the value to skip and a known value + writer_schema = schema.Parse('"%s"' % number_type.lower()) + writer, encoder, datum_writer = write_datum(value_to_skip, writer_schema) + datum_writer.write(VALUE_TO_READ, encoder) + + # skip the value + reader = io.BytesIO(writer.getvalue()) + decoder = avro_io.BinaryDecoder(reader) + decoder.skip_long() + + # read data from string buffer + datum_reader = avro_io.DatumReader(writer_schema) + read_value = datum_reader.read(decoder) + + logging.debug('Read Value: %d', read_value) + if read_value == VALUE_TO_READ: correct += 1 + return correct + + +# ------------------------------------------------------------------------------ + + +class TestIO(unittest.TestCase): + # + # BASIC FUNCTIONALITY + # + + def testValidate(self): + passed = 0 + for example_schema, datum in SCHEMAS_TO_VALIDATE: + logging.debug('Schema: %r', example_schema) + logging.debug('Datum: %r', datum) + validated = avro_io.Validate(schema.Parse(example_schema), datum) + logging.debug('Valid: %s', validated) + if validated: passed += 1 + self.assertEqual(passed, len(SCHEMAS_TO_VALIDATE)) + + def testRoundTrip(self): + correct = 0 + for example_schema, datum in SCHEMAS_TO_VALIDATE: + logging.debug('Schema: %s', example_schema) + logging.debug('Datum: %s', datum) + + writer_schema = schema.Parse(example_schema) + writer, encoder, datum_writer = write_datum(datum, writer_schema) + round_trip_datum = read_datum(writer, writer_schema) + + logging.debug('Round Trip Datum: %s', round_trip_datum) + if datum == round_trip_datum: correct += 1 + self.assertEqual(correct, len(SCHEMAS_TO_VALIDATE)) + + # + # BINARY ENCODING OF INT AND LONG + # + + def testBinaryIntEncoding(self): + correct = check_binary_encoding('int') + self.assertEqual(correct, len(BINARY_ENCODINGS)) + + def testBinaryLongEncoding(self): + correct = check_binary_encoding('long') + self.assertEqual(correct, len(BINARY_ENCODINGS)) + + def testSkipInt(self): + correct = check_skip_number('int') + self.assertEqual(correct, len(BINARY_ENCODINGS)) + + def testSkipLong(self): + correct = check_skip_number('long') + self.assertEqual(correct, len(BINARY_ENCODINGS)) + + # + # SCHEMA RESOLUTION + # + + def testSchemaPromotion(self): + # note that checking writer_schema.type in read_data + # allows us to handle promotion correctly + promotable_schemas = ['"int"', '"long"', '"float"', '"double"'] + incorrect = 0 + for i, ws in enumerate(promotable_schemas): + writer_schema = schema.Parse(ws) + datum_to_write = 219 + for rs in promotable_schemas[i + 1:]: + reader_schema = schema.Parse(rs) + writer, enc, dw = write_datum(datum_to_write, writer_schema) + datum_read = read_datum(writer, writer_schema, reader_schema) + logging.debug('Writer: %s Reader: %s', writer_schema, reader_schema) + logging.debug('Datum Read: %s', datum_read) + if datum_read != datum_to_write: incorrect += 1 + self.assertEqual(incorrect, 0) + + def testUnknownSymbol(self): + writer_schema = schema.Parse("""\ + {"type": "enum", "name": "Test", + "symbols": ["FOO", "BAR"]}""") + datum_to_write = 'FOO' + + reader_schema = schema.Parse("""\ + {"type": "enum", "name": "Test", + "symbols": ["BAR", "BAZ"]}""") + + writer, encoder, datum_writer = write_datum(datum_to_write, writer_schema) + reader = io.BytesIO(writer.getvalue()) + decoder = avro_io.BinaryDecoder(reader) + datum_reader = avro_io.DatumReader(writer_schema, reader_schema) + self.assertRaises(avro_io.SchemaResolutionException, datum_reader.read, decoder) + + def testDefaultValue(self): + writer_schema = LONG_RECORD_SCHEMA + datum_to_write = LONG_RECORD_DATUM + + correct = 0 + for field_type, default_json, default_datum in DEFAULT_VALUE_EXAMPLES: + reader_schema = schema.Parse("""\ + {"type": "record", "name": "Test", + "fields": [{"name": "H", "type": %s, "default": %s}]} + """ % (field_type, default_json)) + datum_to_read = {'H': default_datum} + + writer, encoder, datum_writer = write_datum(datum_to_write, writer_schema) + datum_read = read_datum(writer, writer_schema, reader_schema) + logging.debug('Datum Read: %s', datum_read) + if datum_to_read == datum_read: correct += 1 + self.assertEqual(correct, len(DEFAULT_VALUE_EXAMPLES)) + + def testNoDefaultValue(self): + writer_schema = LONG_RECORD_SCHEMA + datum_to_write = LONG_RECORD_DATUM + + reader_schema = schema.Parse("""\ + {"type": "record", "name": "Test", + "fields": [{"name": "H", "type": "int"}]}""") + + writer, encoder, datum_writer = write_datum(datum_to_write, writer_schema) + reader = io.BytesIO(writer.getvalue()) + decoder = avro_io.BinaryDecoder(reader) + datum_reader = avro_io.DatumReader(writer_schema, reader_schema) + self.assertRaises(avro_io.SchemaResolutionException, datum_reader.read, decoder) + + def testProjection(self): + writer_schema = LONG_RECORD_SCHEMA + datum_to_write = LONG_RECORD_DATUM + + reader_schema = schema.Parse("""\ + {"type": "record", "name": "Test", + "fields": [{"name": "E", "type": "int"}, + {"name": "F", "type": "int"}]}""") + datum_to_read = {'E': 5, 'F': 6} + + writer, encoder, datum_writer = write_datum(datum_to_write, writer_schema) + datum_read = read_datum(writer, writer_schema, reader_schema) + logging.debug('Datum Read: %s', datum_read) + self.assertEqual(datum_to_read, datum_read) + + def testFieldOrder(self): + writer_schema = LONG_RECORD_SCHEMA + datum_to_write = LONG_RECORD_DATUM + + reader_schema = schema.Parse("""\ + {"type": "record", "name": "Test", + "fields": [{"name": "F", "type": "int"}, + {"name": "E", "type": "int"}]}""") + datum_to_read = {'E': 5, 'F': 6} + + writer, encoder, datum_writer = write_datum(datum_to_write, writer_schema) + datum_read = read_datum(writer, writer_schema, reader_schema) + logging.debug('Datum Read: %s', datum_read) + self.assertEqual(datum_to_read, datum_read) + + def testTypeException(self): + writer_schema = schema.Parse("""\ + {"type": "record", "name": "Test", + "fields": [{"name": "F", "type": "int"}, + {"name": "E", "type": "int"}]}""") + datum_to_write = {'E': 5, 'F': 'Bad'} + self.assertRaises( + avro_io.AvroTypeException, write_datum, datum_to_write, writer_schema) + + +if __name__ == '__main__': + raise Exception('Use run_tests.py') Propchange: avro/trunk/lang/py3/avro/tests/test_io.py ------------------------------------------------------------------------------ svn:eol-style = native