avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dcrea...@apache.org
Subject svn commit: r1074612 - in /avro/trunk: ./ lang/c/src/ lang/c/src/avro/ lang/c/tests/
Date Fri, 25 Feb 2011 16:17:57 GMT
Author: dcreager
Date: Fri Feb 25 16:17:56 2011
New Revision: 1074612

URL: http://svn.apache.org/viewvc?rev=1074612&view=rev
Log:
AVRO-762. C: Better schema resolution

This patch adds a new class that implements Avro's schema resolution
rules.  Before, the writer and reader schemas were resolved each time a
datum was read from an Avro file.  Now, we can examine the two schemas
before reading and data values, and "remember" the resolution result in
a tree of avro_resolver_t objects.  The avro_resolver_new function
creates this tree of resolvers.  The result is an instance of a generic
"consumer" interface, which is an abstract API for consuming Avro data
that conforms to some writer schema.  In this case, the resolver
implementation of the consumer API fills in an avro_datum_t that
conforms to the reader schema, promoting data and dropping fields as
necessary.  The avro_consume_binary function takes care of parsing the
Avro binary encoding, passing off the data that it reads in to a
consumer.

We currently don't support the AVRO_LINK schema type; we'll need to add
some memoization to handle recursive schemas properly.

Added:
    avro/trunk/lang/c/src/avro/
    avro/trunk/lang/c/src/avro/consumer.h
    avro/trunk/lang/c/src/consumer.c
    avro/trunk/lang/c/src/resolver.c
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/c/src/CMakeLists.txt
    avro/trunk/lang/c/src/Makefile.am
    avro/trunk/lang/c/src/avro.h
    avro/trunk/lang/c/src/datum_read.c
    avro/trunk/lang/c/src/encoding_binary.c
    avro/trunk/lang/c/src/schema.c
    avro/trunk/lang/c/src/schema.h
    avro/trunk/lang/c/tests/test_avro_data.c
    avro/trunk/lang/c/tests/test_avro_schema.c

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1074612&r1=1074611&r2=1074612&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Feb 25 16:17:56 2011
@@ -91,6 +91,13 @@ Avro 1.5.0 (unreleased)
 
   NEW FEATURES
 
+    AVRO-762. C: New and improved schema resolution API.  The new API
+    correctly handles all of the schema resolution rules listed in the
+    spec.  It performs resolution against two schemas separately from
+    reading in any data, so that we don't have to re-resolve for each
+    data record.  Please see the avro/consumer.h header file for
+    details. (dcreager)
+
     AVRO-463. C: Error message API.  The new avro_strerror() function
     can be used to get a textual description of the error codes returned
     by the other C API functions.  In particular, this includes any JSON

Modified: avro/trunk/lang/c/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/CMakeLists.txt?rev=1074612&r1=1074611&r2=1074612&view=diff
==============================================================================
--- avro/trunk/lang/c/src/CMakeLists.txt (original)
+++ avro/trunk/lang/c/src/CMakeLists.txt Fri Feb 25 16:17:56 2011
@@ -21,8 +21,10 @@ set(AVRO_SRC
     allocation.h
     allocation.c
     avro.h
+    avro/consumer.h
     avro_errors.h
     avro_private.h
+    consumer.c
     datafile.c
     datum.c
     datum.h
@@ -39,6 +41,7 @@ set(AVRO_SRC
     encoding_binary.c
     errors.c
     io.c
+    resolver.c
     schema.c
     schema.h
     schema_equal.c
@@ -71,7 +74,12 @@ set_target_properties(avro-shared PROPER
         OUTPUT_NAME avro
         SOVERSION ${AVRO_VERSION})
 
-install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/avro.h DESTINATION include)
+install(FILES
+        ${CMAKE_CURRENT_SOURCE_DIR}/avro.h
+        DESTINATION include)
+install(FILES
+        ${CMAKE_CURRENT_SOURCE_DIR}/avro/consumer.h
+        DESTINATION include/avro)
 install(TARGETS avro-static avro-shared
         RUNTIME DESTINATION bin
         LIBRARY DESTINATION lib

Modified: avro/trunk/lang/c/src/Makefile.am
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/Makefile.am?rev=1074612&r1=1074611&r2=1074612&view=diff
==============================================================================
--- avro/trunk/lang/c/src/Makefile.am (original)
+++ avro/trunk/lang/c/src/Makefile.am Fri Feb 25 16:17:56 2011
@@ -6,12 +6,13 @@ ACLOCAL_AMFLAGS=-I m4
 pkgconfigdir = $(libdir)/pkgconfig
 pkgconfig_DATA = avro-c.pc
 
-include_HEADERS = avro.h
+nobase_include_HEADERS = avro.h avro/consumer.h
 
 lib_LTLIBRARIES = libavro.la
 libavro_la_SOURCES = st.c st.h schema.c schema.h schema_equal.c \
 datum.c datum_equal.c datum_validate.c datum_read.c datum_skip.c datum_write.c datum_size.c datum.h \
 datum_json.c \
+consumer.c resolver.c \
 io.c dump.c dump.h encoding_binary.c \
 allocation.h allocation.c \
 avro_private.h encoding.h datafile.c \

Modified: avro/trunk/lang/c/src/avro.h
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/avro.h?rev=1074612&r1=1074611&r2=1074612&view=diff
==============================================================================
--- avro/trunk/lang/c/src/avro.h (original)
+++ avro/trunk/lang/c/src/avro.h Fri Feb 25 16:17:56 2011
@@ -141,6 +141,8 @@ avro_schema_t avro_schema_record(const c
 avro_schema_t avro_schema_record_field_get(const avro_schema_t
 					   record, const char *field_name);
 const char *avro_schema_record_field_name(const avro_schema_t schema, int index);
+int avro_schema_record_field_get_index(const avro_schema_t schema,
+				       const char *field_name);
 avro_schema_t avro_schema_record_field_get_by_index
 (const avro_schema_t record, int index);
 int avro_schema_record_field_append(const avro_schema_t record,
@@ -166,6 +168,7 @@ avro_schema_t avro_schema_array(const av
 avro_schema_t avro_schema_array_items(avro_schema_t array);
 
 avro_schema_t avro_schema_union(void);
+size_t avro_schema_union_size(const avro_schema_t union_schema);
 int avro_schema_union_append(const avro_schema_t
 			     union_schema, const avro_schema_t schema);
 avro_schema_t avro_schema_union_branch(avro_schema_t union_schema,

Added: avro/trunk/lang/c/src/avro/consumer.h
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/avro/consumer.h?rev=1074612&view=auto
==============================================================================
--- avro/trunk/lang/c/src/avro/consumer.h (added)
+++ avro/trunk/lang/c/src/avro/consumer.h Fri Feb 25 16:17:56 2011
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.	 You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#ifndef AVRO_CONSUMER_H
+#define AVRO_CONSUMER_H
+#ifdef __cplusplus
+extern "C" {
+#define CLOSE_EXTERN }
+#else
+#define CLOSE_EXTERN
+#endif
+
+#include <avro.h>
+
+
+/*---------------------------------------------------------------------
+ * Consumers
+ */
+
+/**
+ * A <i>consumer</i> is an object that knows how to process Avro data.
+ * There are consumer methods for each type of Avro data.  The
+ * <code>avro_consumer_t</code> struct is an abstract superclass, which
+ * you don't instantiate directly.  Later in this file, we define
+ * several consumer classes that know how to process Avro data in
+ * specific ways.
+ *
+ * For compound Avro values (records, arrays, maps, and unions), the
+ * consumer callbacks provide a nested consumer that should be used to
+ * process subvalues.  Each consumer instance, including these
+ * "subconsumers", contains a reference to the schema of the data that
+ * it expects to process.  This means that the functions that produce
+ * Avro data (such as avro_consume_binary) don't need to maintain their
+ * own references to any schemas, since they'll be encapsulated in the
+ * consumer that they pass their data off to.
+ */
+
+typedef struct avro_consumer_t avro_consumer_t;
+
+struct avro_consumer_t {
+	/**
+	 * The schema of the data that this consumer expects to process.
+	 */
+
+	avro_schema_t  schema;
+
+	/**
+	 * Called when this consumer is freed.  This function should
+	 * free any additional resources acquired by a consumer
+	 * subclass.
+	 */
+
+	void (*free)(avro_consumer_t *consumer);
+
+	/* PRIMITIVE VALUES */
+
+	/**
+	 * Called when a boolean value is encountered.
+	 */
+
+	int (*boolean_value)(avro_consumer_t *consumer,
+			     int value,
+			     void *user_data);
+
+	/**
+	 * Called when a bytes value is encountered. The @ref value
+	 * pointer is only guaranteed to be valid for the duration of
+	 * the callback function.  If you need to save the data for
+	 * processing later, you must copy it into another buffer.
+	 */
+
+	int (*bytes_value)(avro_consumer_t *consumer,
+			   const void *value, size_t value_len,
+			   void *user_data);
+
+	/**
+	 * Called when a double value is encountered.
+	 */
+
+	int (*double_value)(avro_consumer_t *consumer,
+			    double value,
+			    void *user_data);
+
+	/**
+	 * Called when a float value is encountered.
+	 */
+
+	int (*float_value)(avro_consumer_t *consumer,
+			   float value,
+			   void *user_data);
+
+	/**
+	 * Called when an int value is encountered.
+	 */
+
+	int (*int_value)(avro_consumer_t *consumer,
+			 int32_t value,
+			 void *user_data);
+
+	/**
+	 * Called when a long value is encountered.
+	 */
+
+	int (*long_value)(avro_consumer_t *consumer,
+			  int64_t value,
+			  void *user_data);
+
+	/**
+	 * Called when a null value is encountered.
+	 */
+
+	int (*null_value)(avro_consumer_t *consumer, void *user_data);
+
+	/**
+	 * Called when a string value is encountered.  The @ref value
+	 * pointer will point at UTF-8 encoded data.  (If the data
+	 * you're representing isn't a UTF-8 Unicode string, you
+	 * should use the bytes type.)	The @ref value_len parameter
+	 * gives the length of the data in bytes, not in Unicode
+	 * characters.	The @ref value pointer is only guaranteed to
+	 * be valid for the duration of the callback function.	If you
+	 * need to save the data for processing later, you must copy
+	 * it into another buffer.
+	 */
+
+	int (*string_value)(avro_consumer_t *consumer,
+			    const void *value, size_t value_len,
+			    void *user_data);
+
+	/* COMPOUND VALUES */
+
+	/**
+	 * Called when the beginning of an array block is encountered.
+	 * The @ref block_count parameter will contain the number of
+	 * elements in this block.
+	 */
+
+	int (*array_start_block)(avro_consumer_t *consumer,
+				 int is_first_block,
+				 unsigned int block_count,
+				 void *user_data);
+
+	/**
+	 * Called before each individual element of an array is
+	 * processed.  The index of the current element is passed into
+	 * the callback.  The callback should fill in @ref
+	 * element_consumer and @ref element_user_data with the consumer
+	 * and <code>user_data</code> pointer to use to process the
+	 * element.
+	 */
+
+	int (*array_element)(avro_consumer_t *consumer,
+			     unsigned int index,
+			     avro_consumer_t **element_consumer,
+			     void **element_user_data,
+			     void *user_data);
+
+	/**
+	 * Called when an enum value is encountered.
+	 */
+
+	int (*enum_value)(avro_consumer_t *consumer, int value,
+			  void *user_data);
+
+	/**
+	 * Called when a fixed value is encountered.  The @ref value
+	 * pointer is only guaranteed to be valid for the duration of
+	 * the callback function.  If you need to save the data for
+	 * processing later, you must copy it into another buffer.
+	 */
+
+	int (*fixed_value)(avro_consumer_t *consumer,
+			   const void *value, size_t value_len,
+			   void *user_data);
+
+	/**
+	 * Called when the beginning of a map block is encountered.
+	 * The @ref block_count parameter will contain the number of
+	 * elements in this block.
+	 */
+
+	int (*map_start_block)(avro_consumer_t *consumer,
+			       int is_first_block,
+			       unsigned int block_count,
+			       void *user_data);
+
+	/**
+	 * Called before each individual element of a map is
+	 * processed.  The index and key of the current element is
+	 * passed into the callback.  The key is only guaranteed to be
+	 * valid for the duration of the map_element_start callback,
+	 * and the map's subschema callback.  If you need to save it for
+	 * later use, you must copy the key into another memory
+	 * location.  The callback should fill in @ref value_consumer
+	 * and @ref value_user_data with the consumer and
+	 * <code>user_data</code> pointer to use to process the value.
+	 */
+
+	int (*map_element)(avro_consumer_t *consumer,
+			   unsigned int index,
+			   const char *key,
+			   avro_consumer_t **value_consumer,
+			   void **value_user_data,
+			   void *user_data);
+
+	/**
+	 * Called when the beginning of a record is encountered.
+	 */
+
+	int (*record_start)(avro_consumer_t *consumer,
+			    void *user_data);
+
+	/**
+	 * Called before each individual field of a record is
+	 * processed.  The index and name of the current field is
+	 * passed into the callback.  The name is only guaranteed to
+	 * be valid for the duration of the record_field_start
+	 * callback, and the field's subschema callback.  If you need to
+	 * save it for later use, you must copy the key into another
+	 * memory location.  The callback should fill in @ref
+	 * field_consumer and @ref field_user_data with the consumer
+	 * <code>user_data</code> pointer to use to process the field.
+	 */
+
+	int (*record_field)(avro_consumer_t *consumer,
+			    unsigned int index,
+			    avro_consumer_t **field_consumer,
+			    void **field_user_data,
+			    void *user_data);
+
+	/**
+	 * Called when a union value is encountered.  The callback
+	 * should fill in @ref branch_consumer and @ref branch_user_data
+	 * with the consumer <code>user_data</code> pointer to use to
+	 * process the branch.
+	 */
+
+	int (*union_branch)(avro_consumer_t *consumer,
+			    unsigned int discriminant,
+			    avro_consumer_t **branch_consumer,
+			    void **branch_user_data,
+			    void *user_data);
+};
+
+
+/**
+ * Calls the given callback in consumer, if it's present.  If the
+ * callback is NULL, it just returns a success code.
+ */
+
+#define avro_consumer_call(consumer, callback, ...)	\
+	(((consumer)->callback == NULL)? 0:		\
+	 (consumer)->callback((consumer), __VA_ARGS__))
+
+
+/**
+ * Frees an @ref avro_consumer_t instance.  (This function works on
+ * consumer subclasses, too.)
+ */
+
+void avro_consumer_free(avro_consumer_t *consumer);
+
+
+/*---------------------------------------------------------------------
+ * Resolvers
+ */
+
+/**
+ * A <i>resolver</i> is a special kind of consumer that knows how to
+ * implement Avro's schema resolution rules to translate between a
+ * writer schema and a reader schema.  The consumer callbacks line up
+ * with the writer schema; as each element of the datum is produced, the
+ * resolver fills in the contents of an @ref avro_datum_t instance.
+ * (The datum is provided as the user_data when you use the consumer.)
+ */
+
+avro_consumer_t *
+avro_resolver_new(avro_schema_t writer_schema,
+		  avro_schema_t reader_schema);
+
+
+/*---------------------------------------------------------------------
+ * Binary encoding
+ */
+
+/**
+ * Reads an Avro datum from the given @ref avro_reader_t.  As the
+ * datum is read, each portion of it is passed off to the appropriate
+ * callback in @ref consumer.
+ */
+
+int
+avro_consume_binary(avro_reader_t reader,
+		    avro_consumer_t *consumer,
+		    void *ud);
+
+
+CLOSE_EXTERN
+#endif

Added: avro/trunk/lang/c/src/consumer.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/consumer.c?rev=1074612&view=auto
==============================================================================
--- avro/trunk/lang/c/src/consumer.c (added)
+++ avro/trunk/lang/c/src/consumer.c Fri Feb 25 16:17:56 2011
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.	 You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#include "avro/consumer.h"
+
+void avro_consumer_free(avro_consumer_t *consumer)
+{
+	if (consumer->free) {
+		consumer->free(consumer);
+	}
+}

Modified: avro/trunk/lang/c/src/datum_read.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/datum_read.c?rev=1074612&r1=1074611&r2=1074612&view=diff
==============================================================================
--- avro/trunk/lang/c/src/datum_read.c (original)
+++ avro/trunk/lang/c/src/datum_read.c Fri Feb 25 16:17:56 2011
@@ -17,6 +17,7 @@
 
 #include "avro_errors.h"
 #include "avro_private.h"
+#include "avro/consumer.h"
 #include "allocation.h"
 #include <stdlib.h>
 #include <errno.h>
@@ -26,71 +27,15 @@
 #include "datum.h"
 
 int
-avro_schema_match(avro_schema_t writers_schema, avro_schema_t readers_schema)
+avro_schema_match(avro_schema_t wschema, avro_schema_t rschema)
 {
-	if (!is_avro_schema(writers_schema) || !is_avro_schema(readers_schema)) {
-		return 0;
-	}
+	check_param(0, is_avro_schema(wschema), "writer schema");
+	check_param(0, is_avro_schema(rschema), "reader schema");
 
-	switch (avro_typeof(writers_schema)) {
-	case AVRO_UNION:
+	avro_consumer_t  *resolver = avro_resolver_new(wschema, rschema);
+	if (resolver) {
+		avro_consumer_free(resolver);
 		return 1;
-
-	case AVRO_INT32:
-		return is_avro_int32(readers_schema)
-		    || is_avro_int64(readers_schema)
-		    || is_avro_float(readers_schema)
-		    || is_avro_double(readers_schema);
-
-	case AVRO_INT64:
-		return is_avro_int64(readers_schema)
-		    || is_avro_float(readers_schema)
-		    || is_avro_double(readers_schema);
-
-	case AVRO_FLOAT:
-		return is_avro_float(readers_schema)
-		    || is_avro_double(readers_schema);
-
-	case AVRO_STRING:
-	case AVRO_BYTES:
-	case AVRO_DOUBLE:
-	case AVRO_BOOLEAN:
-	case AVRO_NULL:
-		return avro_typeof(writers_schema) ==
-		    avro_typeof(readers_schema);
-
-	case AVRO_RECORD:
-		return is_avro_record(readers_schema)
-		    && strcmp(avro_schema_name(writers_schema),
-			      avro_schema_name(readers_schema)) == 0;
-
-	case AVRO_FIXED:
-		return is_avro_fixed(readers_schema)
-		    && strcmp(avro_schema_name(writers_schema),
-			      avro_schema_name(readers_schema)) == 0
-		    && (avro_schema_to_fixed(writers_schema))->size ==
-		    (avro_schema_to_fixed(readers_schema))->size;
-
-	case AVRO_ENUM:
-		return is_avro_enum(readers_schema)
-		    && strcmp(avro_schema_to_enum(writers_schema)->name,
-			      avro_schema_to_enum(readers_schema)->name) == 0;
-
-	case AVRO_MAP:
-		return is_avro_map(readers_schema)
-		    && avro_typeof(avro_schema_to_map(writers_schema)->values)
-		    == avro_typeof(avro_schema_to_map(readers_schema)->values);
-
-	case AVRO_ARRAY:
-		return is_avro_array(readers_schema)
-		    && avro_typeof(avro_schema_to_array(writers_schema)->items)
-		    == avro_typeof(avro_schema_to_array(readers_schema)->items);
-
-	case AVRO_LINK:
-		/*
-		 * TODO 
-		 */
-		break;
 	}
 
 	return 0;
@@ -98,35 +43,31 @@ avro_schema_match(avro_schema_t writers_
 
 static int
 read_enum(avro_reader_t reader, const avro_encoding_t * enc,
-	  struct avro_enum_schema_t *writers_schema,
-	  struct avro_enum_schema_t *readers_schema, avro_datum_t * datum)
+	  avro_consumer_t *consumer, void *ud)
 {
 	int rval;
 	int64_t index;
 
-	AVRO_UNUSED(writers_schema);
-
 	check_prefix(rval, enc->read_long(reader, &index),
 		     "Cannot read enum value: ");
-	*datum = avro_enum(&readers_schema->obj, index);
-	return 0;
+	return avro_consumer_call(consumer, enum_value, index, ud);
 }
 
 static int
 read_array(avro_reader_t reader, const avro_encoding_t * enc,
-	   struct avro_array_schema_t *writers_schema,
-	   struct avro_array_schema_t *readers_schema, avro_datum_t * datum)
+	   avro_consumer_t *consumer, void *ud)
 {
 	int rval;
-	int64_t i;
+	int64_t i;          /* index within the current block */
+	int64_t index = 0;  /* index within the entire array */
 	int64_t block_count;
 	int64_t block_size;
-	avro_datum_t array_datum;
 
 	check_prefix(rval, enc->read_long(reader, &block_count),
 		     "Cannot read array block count: ");
+	check(rval, avro_consumer_call(consumer, array_start_block,
+				       1, block_count, ud));
 
-	array_datum = avro_array(&readers_schema->obj);
 	while (block_count != 0) {
 		if (block_count < 0) {
 			block_count = block_count * -1;
@@ -134,222 +75,157 @@ read_array(avro_reader_t reader, const a
 				     "Cannot read array block size: ");
 		}
 
-		for (i = 0; i < block_count; i++) {
-			avro_datum_t datum;
-
-			rval =
-			    avro_read_data(reader, writers_schema->items,
-					   readers_schema->items, &datum);
-			if (rval) {
-				avro_datum_decref(array_datum);
-				return rval;
-			}
-			rval = avro_array_append_datum(array_datum, datum);
-			if (rval) {
-				avro_set_error("Cannot append element to array");
-				avro_datum_decref(array_datum);
-				return rval;
-			}
-			avro_datum_decref(datum);
-		}
-
-		rval = enc->read_long(reader, &block_count);
-		if (rval) {
-			avro_prefix_error("Cannot read array block count: ");
-			avro_datum_decref(array_datum);
-			return rval;
-		}
+		for (i = 0; i < block_count; i++, index++) {
+			avro_consumer_t  *element_consumer = NULL;
+			void  *element_ud = NULL;
+
+			check(rval,
+			      avro_consumer_call(consumer, array_element,
+					         index, &element_consumer, &element_ud,
+						 ud));
+
+			check(rval, avro_consume_binary(reader, element_consumer, element_ud));
+		}
+
+		check_prefix(rval, enc->read_long(reader, &block_count),
+			     "Cannot read array block count: ");
+		check(rval, avro_consumer_call(consumer, array_start_block,
+					       0, block_count, ud));
 	}
-	*datum = array_datum;
+
 	return 0;
 }
 
 static int
 read_map(avro_reader_t reader, const avro_encoding_t * enc,
-	 struct avro_map_schema_t *writers_schema,
-	 struct avro_map_schema_t *readers_schema, avro_datum_t * datum)
+	 avro_consumer_t *consumer, void *ud)
 {
 	int rval;
-	int64_t i, block_count;
-	avro_datum_t map = avro_map(&readers_schema->obj);
+	int64_t i;          /* index within the current block */
+	int64_t index = 0;  /* index within the entire array */
+	int64_t block_count;
+	int64_t block_size;
+
+	check_prefix(rval, enc->read_long(reader, &block_count),
+		     "Cannot read map block count: ");
+	check(rval, avro_consumer_call(consumer, map_start_block,
+				       1, block_count, ud));
 
-	rval = enc->read_long(reader, &block_count);
-	if (rval) {
-		avro_prefix_error("Cannot read map block count: ");
-		avro_datum_decref(map);
-		return rval;
-	}
 	while (block_count != 0) {
-		int64_t block_size;
 		if (block_count < 0) {
 			block_count = block_count * -1;
-			rval = enc->read_long(reader, &block_size);
-			if (rval) {
-				avro_prefix_error("Cannot read map block size: ");
-				avro_datum_decref(map);
-				return rval;
-			}
+			check_prefix(rval, enc->read_long(reader, &block_size),
+				     "Cannot read map block size: ");
 		}
-		for (i = 0; i < block_count; i++) {
+
+		for (i = 0; i < block_count; i++, index++) {
 			char *key;
 			int64_t key_size;
-			avro_datum_t value;
-			rval = enc->read_string(reader, &key, &key_size);
-			if (rval) {
-				avro_prefix_error("Cannot read map key: ");
-				avro_datum_decref(map);
-				return rval;
-			}
-			rval =
-			    avro_read_data(reader,
-					   avro_schema_to_map(writers_schema)->
-					   values,
-					   avro_schema_to_map(readers_schema)->
-					   values, &value);
+			avro_consumer_t  *element_consumer = NULL;
+			void  *element_ud = NULL;
+
+			check_prefix(rval, enc->read_string(reader, &key, &key_size),
+				     "Cannot read map key: ");
+
+			rval = avro_consumer_call(consumer, map_element,
+						  index, key,
+						  &element_consumer, &element_ud,
+						  ud);
 			if (rval) {
 				avro_free(key, key_size);
 				return rval;
 			}
-			rval = avro_map_set(map, key, value);
+
+			rval = avro_consume_binary(reader, element_consumer, element_ud);
 			if (rval) {
-				avro_set_error("Cannot append element to map");
 				avro_free(key, key_size);
-				avro_datum_decref(map);
 				return rval;
 			}
-			avro_datum_decref(value);
+
 			avro_free(key, key_size);
 		}
-		rval = enc->read_long(reader, &block_count);
-		if (rval) {
-			avro_prefix_error("Cannot read map block count: ");
-			avro_datum_decref(map);
-			return rval;
-		}
+
+		check_prefix(rval, enc->read_long(reader, &block_count),
+			     "Cannot read map block count: ");
+		check(rval, avro_consumer_call(consumer, map_start_block,
+					       0, block_count, ud));
 	}
-	*datum = map;
+
 	return 0;
 }
 
 static int
 read_union(avro_reader_t reader, const avro_encoding_t * enc,
-	   struct avro_union_schema_t *writers_schema,
-	   struct avro_union_schema_t *readers_schema, avro_datum_t * datum)
+	   avro_consumer_t *consumer, void *ud)
 {
 	int rval;
 	int64_t discriminant;
-	avro_datum_t value;
-	union {
-		st_data_t data;
-		avro_schema_t schema;
-	} val;
-
-	AVRO_UNUSED(readers_schema);
+	avro_consumer_t  *branch_consumer = NULL;
+	void  *branch_ud = NULL;
 
 	check_prefix(rval, enc->read_long(reader, &discriminant),
 		     "Cannot read union discriminant: ");
-	if (!st_lookup(writers_schema->branches, discriminant, &val.data)) {
-		avro_set_error("Union doesn't have branch %ld", (long) discriminant);
-		return EILSEQ;
-	}
-	check(rval, avro_read_data(reader, val.schema, NULL, &value));
-	*datum = avro_union(&readers_schema->obj, discriminant, value);
-	avro_datum_decref(value);
-	return 0;
+	check(rval, avro_consumer_call(consumer, union_branch,
+				       discriminant,
+				       &branch_consumer, &branch_ud, ud));
+	return avro_consume_binary(reader, branch_consumer, branch_ud);
 }
 
-/* TODO: handle default values in fields */
 static int
 read_record(avro_reader_t reader, const avro_encoding_t * enc,
-	    struct avro_record_schema_t *writers_schema,
-	    struct avro_record_schema_t *readers_schema, avro_datum_t * datum)
+	    avro_consumer_t *consumer, void *ud)
 {
 	int rval;
-	long i;
-	avro_datum_t record;
-	avro_datum_t field_datum;
+	size_t  num_fields;
+	unsigned int  i;
 
 	AVRO_UNUSED(enc);
 
-	record = *datum = avro_record(&readers_schema->obj);
-	for (i = 0; i < writers_schema->fields->num_entries; i++) {
-		union {
-			st_data_t data;
-			struct avro_record_field_t *field;
-		} rfield, wfield;
-		st_lookup(writers_schema->fields, i, &wfield.data);
-		if (st_lookup
-		    (readers_schema->fields_byname,
-		     (st_data_t) wfield.field->name, &rfield.data)) {
-			rval =
-			    avro_read_data(reader, wfield.field->type,
-					   rfield.field->type, &field_datum);
-			if (rval) {
-				return rval;
-			}
-			rval =
-			    avro_record_set(record, wfield.field->name,
-					    field_datum);
-			if (rval) {
-				avro_set_error("Cannot append field to record");
-				return rval;
-			}
-			avro_datum_decref(field_datum);
+	check(rval, avro_consumer_call(consumer, record_start, ud));
+
+	num_fields = avro_schema_record_size(consumer->schema);
+	for (i = 0; i < num_fields; i++) {
+		avro_consumer_t  *field_consumer = NULL;
+		void  *field_ud = NULL;
+
+		check(rval, avro_consumer_call(consumer, record_field,
+					       i, &field_consumer, &field_ud,
+					       ud));
+
+		if (field_consumer) {
+			check(rval, avro_consume_binary(reader, field_consumer, field_ud));
 		} else {
-			rval = avro_skip_data(reader, wfield.field->type);
-			if (rval) {
-				return rval;
-			}
+			avro_schema_t  field_schema =
+			    avro_schema_record_field_get_by_index(consumer->schema, i);
+			check(rval, avro_skip_data(reader, field_schema));
 		}
 	}
-	return 0;
-}
 
-static void
-free_bytes(void *ptr, size_t sz)
-{
-	// The binary encoder class allocates bytes values with an extra
-	// byte, so that they're NUL terminated.
-	avro_free(ptr, sz+1);
+	return 0;
 }
 
 int
-avro_read_data(avro_reader_t reader, avro_schema_t writers_schema,
-	       avro_schema_t readers_schema, avro_datum_t * datum)
+avro_consume_binary(avro_reader_t reader, avro_consumer_t *consumer, void *ud)
 {
-	int rval = EINVAL;
+	int rval;
 	const avro_encoding_t *enc = &avro_binary_encoding;
 
 	check_param(EINVAL, reader, "reader");
-	check_param(EINVAL, is_avro_schema(writers_schema), "writer schema");
-	check_param(EINVAL, datum, "datum pointer");
-
-	if (readers_schema == NULL) {
-		readers_schema = writers_schema;
-	} else if (!avro_schema_match(writers_schema, readers_schema)) {
-		avro_set_error("Reader and writer schemas aren't compatible");
-		return EINVAL;
-	}
+	check_param(EINVAL, consumer, "consumer");
 
-	switch (avro_typeof(writers_schema)) {
+	switch (avro_typeof(consumer->schema)) {
 	case AVRO_NULL:
-		rval = enc->read_null(reader);
-		if (rval) {
-			avro_prefix_error("Cannot read null value: ");
-		} else {
-			*datum = avro_null();
-		}
+		check_prefix(rval, enc->read_null(reader),
+			     "Cannot read null value: ");
+		check(rval, avro_consumer_call(consumer, null_value, ud));
 		break;
 
 	case AVRO_BOOLEAN:
 		{
 			int8_t b;
-			rval = enc->read_boolean(reader, &b);
-			if (rval) {
-				avro_prefix_error("Cannot read boolean value: ");
-			} else {
-				*datum = avro_boolean(b);
-			}
+			check_prefix(rval, enc->read_boolean(reader, &b),
+				     "Cannot read boolean value: ");
+			check(rval, avro_consumer_call(consumer, boolean_value, b, ud));
 		}
 		break;
 
@@ -357,60 +233,45 @@ avro_read_data(avro_reader_t reader, avr
 		{
 			int64_t len;
 			char *s;
-			rval = enc->read_string(reader, &s, &len);
-			if (rval) {
-				avro_prefix_error("Cannot read string value: ");
-			} else {
-				*datum = avro_givestring(s, avro_alloc_free);
-			}
+			check_prefix(rval, enc->read_string(reader, &s, &len),
+				     "Cannot read string value: ");
+			check(rval, avro_consumer_call(consumer, string_value, s, len, ud));
 		}
 		break;
 
 	case AVRO_INT32:
 		{
 			int32_t i;
-			rval = enc->read_int(reader, &i);
-			if (rval) {
-				avro_prefix_error("Cannot read int value: ");
-			} else {
-				*datum = avro_int32(i);
-			}
+			check_prefix(rval, enc->read_int(reader, &i),
+				    "Cannot read int value: ");
+			check(rval, avro_consumer_call(consumer, int_value, i, ud));
 		}
 		break;
 
 	case AVRO_INT64:
 		{
 			int64_t l;
-			rval = enc->read_long(reader, &l);
-			if (rval) {
-				avro_prefix_error("Cannot read long value: ");
-			} else {
-				*datum = avro_int64(l);
-			}
+			check_prefix(rval, enc->read_long(reader, &l),
+				     "Cannot read long value: ");
+			check(rval, avro_consumer_call(consumer, long_value, l, ud));
 		}
 		break;
 
 	case AVRO_FLOAT:
 		{
 			float f;
-			rval = enc->read_float(reader, &f);
-			if (rval) {
-				avro_prefix_error("Cannot read float value: ");
-			} else {
-				*datum = avro_float(f);
-			}
+			check_prefix(rval, enc->read_float(reader, &f),
+				     "Cannot read float value: ");
+			check(rval, avro_consumer_call(consumer, float_value, f, ud));
 		}
 		break;
 
 	case AVRO_DOUBLE:
 		{
 			double d;
-			rval = enc->read_double(reader, &d);
-			if (rval) {
-				avro_prefix_error("Cannot read double value: ");
-			} else {
-				*datum = avro_double(d);
-			}
+			check_prefix(rval, enc->read_double(reader, &d),
+				     "Cannot read double value: ");
+			check(rval, avro_consumer_call(consumer, double_value, d, ud));
 		}
 		break;
 
@@ -418,19 +279,17 @@ avro_read_data(avro_reader_t reader, avr
 		{
 			char *bytes;
 			int64_t len;
-			rval = enc->read_bytes(reader, &bytes, &len);
-			if (rval) {
-				avro_prefix_error("Cannot read bytes value: ");
-			} else {
-				*datum = avro_givebytes(bytes, len, free_bytes);
-			}
+			check_prefix(rval, enc->read_bytes(reader, &bytes, &len),
+				     "Cannot read bytes value: ");
+			check(rval, avro_consumer_call(consumer, bytes_value, bytes, len, ud));
 		}
 		break;
 
-	case AVRO_FIXED:{
+	case AVRO_FIXED:
+		{
 			char *bytes;
 			int64_t size =
-			    avro_schema_to_fixed(writers_schema)->size;
+			    avro_schema_to_fixed(consumer->schema)->size;
 
 			bytes = avro_malloc(size);
 			if (!bytes) {
@@ -440,53 +299,80 @@ avro_read_data(avro_reader_t reader, avr
 			rval = avro_read(reader, bytes, size);
 			if (rval) {
 				avro_prefix_error("Cannot read fixed value: ");
-			} else {
-				*datum = avro_givefixed(readers_schema, bytes, size,
-							avro_alloc_free);
+				avro_free(bytes, size);
+				return rval;
+			}
+
+			rval = avro_consumer_call(consumer, fixed_value, bytes, size, ud);
+			if (rval) {
+				avro_free(bytes, size);
+				return rval;
 			}
 		}
 		break;
 
 	case AVRO_ENUM:
-		rval =
-		    read_enum(reader, enc, avro_schema_to_enum(writers_schema),
-			      avro_schema_to_enum(readers_schema), datum);
+		check(rval, read_enum(reader, enc, consumer, ud));
 		break;
 
 	case AVRO_ARRAY:
-		rval =
-		    read_array(reader, enc,
-			       avro_schema_to_array(writers_schema),
-			       avro_schema_to_array(readers_schema), datum);
+		check(rval, read_array(reader, enc, consumer, ud));
 		break;
 
 	case AVRO_MAP:
-		rval =
-		    read_map(reader, enc, avro_schema_to_map(writers_schema),
-			     avro_schema_to_map(readers_schema), datum);
+		check(rval, read_map(reader, enc, consumer, ud));
 		break;
 
 	case AVRO_UNION:
-		rval =
-		    read_union(reader, enc,
-			       avro_schema_to_union(writers_schema),
-			       avro_schema_to_union(readers_schema), datum);
+		check(rval, read_union(reader, enc, consumer, ud));
 		break;
 
 	case AVRO_RECORD:
-		rval =
-		    read_record(reader, enc,
-				avro_schema_to_record(writers_schema),
-				avro_schema_to_record(readers_schema), datum);
+		check(rval, read_record(reader, enc, consumer, ud));
 		break;
 
 	case AVRO_LINK:
-		rval =
-		    avro_read_data(reader,
-				   (avro_schema_to_link(writers_schema))->to,
-				   readers_schema, datum);
-		break;
+		avro_set_error("Consumer can't consume a link schema directly");
+		return EINVAL;
 	}
 
-	return rval;
+	return 0;
+}
+
+
+int
+avro_read_data(avro_reader_t reader, avro_schema_t writers_schema,
+	       avro_schema_t readers_schema, avro_datum_t * datum)
+{
+	int rval;
+
+	check_param(EINVAL, reader, "reader");
+	check_param(EINVAL, is_avro_schema(writers_schema), "writer schema");
+	check_param(EINVAL, datum, "datum pointer");
+
+	if (!readers_schema) {
+		readers_schema = writers_schema;
+	}
+
+	avro_datum_t  result = avro_datum_from_schema(readers_schema);
+	if (!result) {
+		return EINVAL;
+	}
+
+	avro_consumer_t  *resolver = avro_resolver_new(writers_schema, readers_schema);
+	if (!resolver) {
+		avro_datum_decref(result);
+		return EINVAL;
+	}
+
+	rval = avro_consume_binary(reader, resolver, result);
+	if (rval) {
+		avro_consumer_free(resolver);
+		avro_datum_decref(result);
+		return rval;
+	}
+
+	avro_consumer_free(resolver);
+	*datum = result;
+	return 0;
 }

Modified: avro/trunk/lang/c/src/encoding_binary.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/encoding_binary.c?rev=1074612&r1=1074611&r2=1074612&view=diff
==============================================================================
--- avro/trunk/lang/c/src/encoding_binary.c (original)
+++ avro/trunk/lang/c/src/encoding_binary.c Fri Feb 25 16:17:56 2011
@@ -140,7 +140,7 @@ static int read_bytes(avro_reader_t read
 
 static int skip_bytes(avro_reader_t reader)
 {
-	int64_t len;
+	int64_t len = 0;
 	int rval;
 	check_prefix(rval, read_long(reader, &len),
 		     "Cannot read bytes length: ");
@@ -172,7 +172,7 @@ size_bytes(avro_writer_t writer, const c
 
 static int read_string(avro_reader_t reader, char **s, int64_t *len)
 {
-	int64_t  str_len;
+	int64_t  str_len = 0;
 	int rval;
 	check_prefix(rval, read_long(reader, &str_len),
 		     "Cannot read string length: ");

Added: avro/trunk/lang/c/src/resolver.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/resolver.c?rev=1074612&view=auto
==============================================================================
--- avro/trunk/lang/c/src/resolver.c (added)
+++ avro/trunk/lang/c/src/resolver.c Fri Feb 25 16:17:56 2011
@@ -0,0 +1,1136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.	 You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#include <inttypes.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "avro.h"
+#include "avro/consumer.h"
+#include "avro_errors.h"
+#include "avro_private.h"
+#include "allocation.h"
+
+
+#if !defined(DEBUG_RESOLVER)
+#define DEBUG_RESOLVER 0
+#endif
+
+#if DEBUG_RESOLVER
+#include <stdio.h>
+#define debug(...) { fprintf(stderr, __VA_ARGS__); fprintf(stderr, "\n"); }
+#else
+#define debug(...) /* no debug output */
+#endif
+
+
+typedef struct avro_resolver_t  avro_resolver_t;
+
+struct avro_resolver_t {
+	avro_consumer_t  parent;
+
+	/* The reader schema for this resolver. */
+	avro_schema_t  rschema;
+
+	/* An array of any child resolvers needed for the subschemas of
+	 * wschema */
+	avro_consumer_t  **child_resolvers;
+
+	/* If the reader and writer schemas are records, this field
+	 * contains a mapping from writer field indices to reader field
+	 * indices. */
+	int  *index_mapping;
+
+	/* The number of elements in the child_resolvers and
+	 * index_mapping arrays. */
+	size_t  num_children;
+
+	/* If the reader schema is a union, but the writer schema is
+	 * not, this field indicates which branch of the reader union
+	 * should be selected. */
+	int  reader_union_branch;
+};
+
+
+static void
+avro_resolver_free(avro_consumer_t *consumer)
+{
+	avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+	avro_schema_decref(resolver->parent.schema);
+	avro_schema_decref(resolver->rschema);
+	if (resolver->child_resolvers) {
+		unsigned int  i;
+		for (i = 0; i < resolver->num_children; i++) {
+			avro_consumer_t  *child = resolver->child_resolvers[i];
+			if (child) {
+				avro_consumer_free(child);
+			}
+		}
+		avro_free(resolver->child_resolvers,
+			  sizeof(avro_resolver_t *) * resolver->num_children);
+	}
+	if (resolver->index_mapping) {
+		avro_free(resolver->index_mapping,
+			  sizeof(int) * resolver->num_children);
+	}
+	avro_freet(avro_resolver_t, resolver);
+}
+
+
+/**
+ * Create a new avro_resolver_t instance.  You must fill in the callback
+ * pointers that are appropriate for the writer schema after this
+ * function returns.
+ */
+
+static avro_resolver_t *
+avro_resolver_create(avro_schema_t wschema,
+		     avro_schema_t rschema)
+{
+	avro_resolver_t  *resolver = avro_new(avro_resolver_t);
+	memset(resolver, 0, sizeof(avro_resolver_t));
+
+	resolver->parent.free = avro_resolver_free;
+	resolver->parent.schema = avro_schema_incref(wschema);
+	resolver->rschema = avro_schema_incref(rschema);
+	resolver->reader_union_branch = -1;
+	return resolver;
+}
+
+
+static avro_datum_t
+avro_resolver_get_real_dest(avro_resolver_t *resolver, avro_datum_t dest)
+{
+	if (resolver->reader_union_branch < 0) {
+		/*
+		 * The reader schema isn't a union, so use the dest
+		 * field as-is.
+		 */
+
+		return dest;
+	}
+
+	debug("Retrieving union branch %d for %s value",
+	      resolver->reader_union_branch,
+	      avro_schema_type_name(resolver->parent.schema));
+
+	avro_datum_t  branch = NULL;
+	avro_union_set_discriminant
+	    (dest, resolver->reader_union_branch, &branch);
+	return branch;
+}
+
+
+/*-----------------------------------------------------------------------
+ * Reader unions
+ */
+
+/*
+ * For each Avro type, we have to check whether the reader schema on its
+ * own is compatible, and whether the reader is a union that contains a
+ * compatible type.  The macros in this section help us perform both of
+ * these checks with less code.
+ */
+
+
+/**
+ * A helper macro that handles the case where neither writer nor reader
+ * are unions.  Uses @ref check_func to see if the two schemas are
+ * compatible.
+ */
+
+#define check_non_union(wschema, rschema, check_func)		\
+{								\
+	avro_resolver_t  *self = NULL;				\
+	int  rc = check_func(&self, wschema, rschema,		\
+			     rschema);				\
+	if (self) {						\
+		debug("Non-union schemas %s (writer) "		\
+		      "and %s (reader) match",			\
+		      avro_schema_type_name(wschema),		\
+		      avro_schema_type_name(rschema));		\
+								\
+		self->reader_union_branch = -1;			\
+		return &self->parent;				\
+        }							\
+								\
+        if (rc) {						\
+		return NULL;					\
+	}							\
+    }
+
+
+/**
+ * Helper macro that handles the case where the reader is a union, and
+ * the writer is not.  Checks each branch of the reader union schema,
+ * looking for the first branch that is compatible with the writer
+ * schema.  The @ref check_func argument should be a function that can
+ * check the compatiblity of each branch schema.
+ */
+
+#define check_reader_union(wschema, rschema, check_func)		\
+{									\
+	if (!is_avro_union(rschema)) {					\
+		break;							\
+	}								\
+									\
+	debug("Checking reader union schema");				\
+	size_t  num_branches = avro_schema_union_size(rschema);		\
+	unsigned int  i;						\
+									\
+	for (i = 0; i < num_branches; i++) {				\
+		avro_schema_t  branch_schema =				\
+		    avro_schema_union_branch(rschema, i);		\
+		avro_resolver_t  *self = NULL;				\
+		int  rc = check_func(&self, wschema, branch_schema,	\
+				     rschema);				\
+		if (self) {						\
+			debug("Reader union branch %d (%s) "		\
+			      "and writer %s match",			\
+			      i, avro_schema_type_name(branch_schema),	\
+			      avro_schema_type_name(wschema));		\
+			self->reader_union_branch = i;			\
+			return &self->parent;				\
+		}							\
+									\
+		if (rc) {						\
+			return NULL;					\
+		}							\
+	}								\
+									\
+	debug("No reader union branches match");			\
+}
+
+/**
+ * A helper macro that defines wraps together check_non_union and
+ * check_reader_union for a simple (non-union) writer schema type.
+ */
+
+#define check_simple_writer(wschema, rschema, type_name)		\
+{									\
+	check_non_union(wschema, rschema, try_##type_name);		\
+	check_reader_union(wschema, rschema, try_##type_name);		\
+	debug("Writer %s doesn't match reader %s",			\
+	      avro_schema_type_name(wschema),				\
+	      avro_schema_type_name(rschema));				\
+	avro_set_error("Cannot store a " #type_name " into schema %s",	\
+		       avro_schema_type_name(rschema));			\
+	return NULL;							\
+}
+
+
+/*-----------------------------------------------------------------------
+ * primitives
+ */
+
+static int
+avro_resolver_boolean_value(avro_consumer_t *consumer, int value,
+			    void *user_data)
+{
+	avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+	avro_datum_t  ud_dest = user_data;
+	avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+	debug("Storing %s into %p", value? "TRUE": "FALSE", dest);
+	return avro_boolean_set(dest, value);
+}
+
+static int
+try_boolean(avro_resolver_t **resolver,
+	    avro_schema_t wschema, avro_schema_t rschema,
+	    avro_schema_t root_rschema)
+{
+	if (is_avro_boolean(rschema)) {
+		*resolver = avro_resolver_create(wschema, root_rschema);
+		(*resolver)->parent.boolean_value = avro_resolver_boolean_value;
+	}
+	return 0;
+}
+
+
+static void
+free_bytes(void *ptr, size_t sz)
+{
+	/*
+	 * The binary encoder class allocates bytes values with an extra
+	 * byte, so that they're NUL terminated.
+	 */
+	avro_free(ptr, sz+1);
+}
+
+static int
+avro_resolver_bytes_value(avro_consumer_t *consumer,
+			  const void *value, size_t value_len,
+			  void *user_data)
+{
+	avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+	avro_datum_t  ud_dest = user_data;
+	avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+	debug("Storing %zu bytes into %p", value_len, dest);
+	return avro_givebytes_set(dest, value, value_len, free_bytes);
+}
+
+static int
+try_bytes(avro_resolver_t **resolver,
+	  avro_schema_t wschema, avro_schema_t rschema,
+	  avro_schema_t root_rschema)
+{
+	if (is_avro_bytes(rschema)) {
+		*resolver = avro_resolver_create(wschema, root_rschema);
+		(*resolver)->parent.bytes_value = avro_resolver_bytes_value;
+	}
+	return 0;
+}
+
+
+static int
+avro_resolver_double_value(avro_consumer_t *consumer, double value,
+			   void *user_data)
+{
+	avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+	avro_datum_t  ud_dest = user_data;
+	avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+	debug("Storing %le into %p", value, dest);
+	return avro_double_set(dest, value);
+}
+
+static int
+try_double(avro_resolver_t **resolver,
+	   avro_schema_t wschema, avro_schema_t rschema,
+	   avro_schema_t root_rschema)
+{
+	if (is_avro_double(rschema)) {
+		*resolver = avro_resolver_create(wschema, root_rschema);
+		(*resolver)->parent.double_value = avro_resolver_double_value;
+	}
+	return 0;
+}
+
+
+static int
+avro_resolver_float_value(avro_consumer_t *consumer, float value,
+			  void *user_data)
+{
+	avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+	avro_datum_t  ud_dest = user_data;
+	avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+	debug("Storing %e into %p", value, dest);
+	return avro_float_set(dest, value);
+}
+
+static int
+try_float(avro_resolver_t **resolver,
+	  avro_schema_t wschema, avro_schema_t rschema,
+	  avro_schema_t root_rschema)
+{
+	if (is_avro_float(rschema)) {
+		*resolver = avro_resolver_create(wschema, root_rschema);
+		(*resolver)->parent.float_value = avro_resolver_float_value;
+	}
+	return 0;
+}
+
+
+static int
+avro_resolver_int_value(avro_consumer_t *consumer, int32_t value,
+			void *user_data)
+{
+	avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+	avro_datum_t  ud_dest = user_data;
+	avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+	debug("Storing %" PRId32 " into %p", value, dest);
+	return avro_int32_set(dest, value);
+}
+
+static int
+try_int(avro_resolver_t **resolver,
+	avro_schema_t wschema, avro_schema_t rschema,
+	avro_schema_t root_rschema)
+{
+	if (is_avro_int32(rschema)) {
+		*resolver = avro_resolver_create(wschema, root_rschema);
+		(*resolver)->parent.int_value = avro_resolver_int_value;
+	}
+	return 0;
+}
+
+
+static int
+avro_resolver_long_value(avro_consumer_t *consumer, int64_t value,
+			 void *user_data)
+{
+	avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+	avro_datum_t  ud_dest = user_data;
+	avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+	debug("Storing %" PRId64 " into %p", value, dest);
+	return avro_int64_set(dest, value);
+}
+
+static int
+try_long(avro_resolver_t **resolver,
+	 avro_schema_t wschema, avro_schema_t rschema,
+	 avro_schema_t root_rschema)
+{
+	if (is_avro_int64(rschema)) {
+		*resolver = avro_resolver_create(wschema, root_rschema);
+		(*resolver)->parent.long_value = avro_resolver_long_value;
+	}
+	return 0;
+}
+
+
+static int
+avro_resolver_null_value(avro_consumer_t *consumer, void *user_data)
+{
+#if DEBUG_RESOLVER
+	avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+	avro_datum_t  ud_dest = user_data;
+	avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+	debug("Storing null into %p", dest);
+#else
+	AVRO_UNUSED(consumer);
+	AVRO_UNUSED(user_data);
+#endif
+	return 0;
+}
+
+static int
+try_null(avro_resolver_t **resolver,
+	 avro_schema_t wschema, avro_schema_t rschema,
+	 avro_schema_t root_rschema)
+{
+	if (is_avro_null(rschema)) {
+		*resolver = avro_resolver_create(wschema, root_rschema);
+		(*resolver)->parent.null_value = avro_resolver_null_value;
+	}
+	return 0;
+}
+
+
+static int
+avro_resolver_string_value(avro_consumer_t *consumer,
+			   const void *value, size_t value_len,
+			   void *user_data)
+{
+	AVRO_UNUSED(value_len);
+	avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+	avro_datum_t  ud_dest = user_data;
+	avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+	debug("Storing \"%s\" into %p", (const char *) value, dest);
+	return avro_givestring_set(dest, value, avro_alloc_free);
+}
+
+static int
+try_string(avro_resolver_t **resolver,
+	   avro_schema_t wschema, avro_schema_t rschema,
+	   avro_schema_t root_rschema)
+{
+	if (is_avro_string(rschema)) {
+		*resolver = avro_resolver_create(wschema, root_rschema);
+		(*resolver)->parent.string_value = avro_resolver_string_value;
+	}
+	return 0;
+}
+
+
+/*-----------------------------------------------------------------------
+ * arrays
+ */
+
+static int
+avro_resolver_array_start_block(avro_consumer_t *consumer,
+				int is_first_block,
+				unsigned int block_count,
+				void *user_data)
+{
+#if DEBUG_RESOLVER
+	if (is_first_block) {
+		avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+		avro_datum_t  ud_dest = user_data;
+		avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+		debug("Starting array %p", dest);
+	}
+#else
+	AVRO_UNUSED(consumer);
+	AVRO_UNUSED(user_data);
+	AVRO_UNUSED(is_first_block);
+#endif
+
+	AVRO_UNUSED(block_count);
+	return 0;
+}
+
+static int
+avro_resolver_array_element(avro_consumer_t *consumer,
+			    unsigned int index,
+			    avro_consumer_t **element_consumer,
+			    void **element_user_data,
+			    void *user_data)
+{
+	AVRO_UNUSED(index);
+
+	avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+	avro_datum_t  ud_dest = user_data;
+	avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+	debug("Adding element to array %p", dest);
+
+	/*
+	 * Allocate a new element datum and add it to the array.
+	 */
+	avro_schema_t  array_schema = avro_datum_get_schema(dest);
+	avro_schema_t  item_schema = avro_schema_array_items(array_schema);
+	avro_datum_t  element = avro_datum_from_schema(item_schema);
+	avro_array_append_datum(dest, element);
+	avro_datum_decref(element);
+
+	/*
+	 * Return the consumer that we allocated to process the array's
+	 * children.
+	 */
+	*element_consumer = resolver->child_resolvers[0];
+	*element_user_data = element;
+	return 0;
+}
+
+static int
+try_array(avro_resolver_t **resolver,
+	  avro_schema_t wschema, avro_schema_t rschema,
+	  avro_schema_t root_rschema)
+{
+	/*
+	 * First verify that the reader is an array.
+	 */
+
+	if (!is_avro_array(rschema)) {
+		return 0;
+	}
+
+	/*
+	 * Array schemas have to have compatible element schemas to be
+	 * compatible themselves.  Try to create an avro_resolver_t to
+	 * check the compatibility.
+	 */
+
+	avro_schema_t  witems = avro_schema_array_items(wschema);
+	avro_schema_t  ritems = avro_schema_array_items(rschema);
+
+	avro_consumer_t  *item_consumer = avro_resolver_new(witems, ritems);
+	if (!item_consumer) {
+		avro_prefix_error("Array values aren't compatible: ");
+		return EINVAL;
+	}
+
+	/*
+	 * The two schemas are compatible, so go ahead and create a
+	 * GavroResolver for the array.  Store the item schema's
+	 * resolver into the child_resolvers field.
+	 */
+
+	*resolver = avro_resolver_create(wschema, root_rschema);
+	(*resolver)->num_children = 1;
+	(*resolver)->child_resolvers = avro_calloc(1, sizeof(avro_consumer_t *));
+	(*resolver)->child_resolvers[0] = item_consumer;
+	(*resolver)->parent.array_start_block = avro_resolver_array_start_block;
+	(*resolver)->parent.array_element = avro_resolver_array_element;
+
+	return 0;
+}
+
+
+/*-----------------------------------------------------------------------
+ * enums
+ */
+
+static int
+avro_resolver_enum_value(avro_consumer_t *consumer, int value,
+			 void *user_data)
+{
+	AVRO_UNUSED(value);
+
+	avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+	avro_datum_t  ud_dest = user_data;
+	avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+
+	const char  *symbol_name = avro_schema_enum_get(resolver->parent.schema, value);
+	debug("Storing symbol %s into %p", symbol_name, dest);
+	return avro_enum_set_name(dest, symbol_name);
+}
+
+static int
+try_enum(avro_resolver_t **resolver,
+	 avro_schema_t wschema, avro_schema_t rschema,
+	 avro_schema_t root_rschema)
+{
+	/*
+	 * Enum schemas have to have the same name — but not the same
+	 * list of symbols — to be compatible.
+	 */
+
+	if (is_avro_enum(rschema)) {
+		const char  *wname = avro_schema_name(wschema);
+		const char  *rname = avro_schema_name(rschema);
+
+		if (!strcmp(wname, rname)) {
+			*resolver = avro_resolver_create(wschema, root_rschema);
+			(*resolver)->parent.enum_value = avro_resolver_enum_value;
+		}
+	}
+	return 0;
+}
+
+
+/*-----------------------------------------------------------------------
+ * fixed
+ */
+
+static int
+avro_resolver_fixed_value(avro_consumer_t *consumer,
+			  const void *value, size_t value_len,
+			  void *user_data)
+{
+	avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+	avro_datum_t  ud_dest = user_data;
+	avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+	debug("Storing (fixed) %zu bytes into %p", value_len, dest);
+	return avro_givefixed_set(dest, value, value_len, avro_alloc_free);
+}
+
+static int
+try_fixed(avro_resolver_t **resolver,
+	  avro_schema_t wschema, avro_schema_t rschema,
+	  avro_schema_t root_rschema)
+{
+	/*
+	 * Fixed schemas need the same name and size to be compatible.
+	 */
+
+	if (avro_schema_equal(wschema, rschema)) {
+		*resolver = avro_resolver_create(wschema, root_rschema);
+		(*resolver)->parent.fixed_value = avro_resolver_fixed_value;
+	}
+	return 0;
+}
+
+
+/*-----------------------------------------------------------------------
+ * maps
+ */
+
+static int
+avro_resolver_map_start_block(avro_consumer_t *consumer,
+			      int is_first_block,
+			      unsigned int block_count,
+			      void *user_data)
+{
+#if DEBUG_RESOLVER
+	if (is_first_block) {
+		avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+		avro_datum_t  ud_dest = user_data;
+		avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+		debug("Starting map %p", dest);
+	}
+#else
+	AVRO_UNUSED(consumer);
+	AVRO_UNUSED(user_data);
+	AVRO_UNUSED(is_first_block);
+#endif
+
+	AVRO_UNUSED(block_count);
+	return 0;
+}
+
+static int
+avro_resolver_map_element(avro_consumer_t *consumer,
+			  unsigned int index,
+			  const char *key,
+			  avro_consumer_t **value_consumer,
+			  void **value_user_data,
+			  void *user_data)
+{
+	AVRO_UNUSED(index);
+
+	avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+	avro_datum_t  ud_dest = user_data;
+	avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+	debug("Adding element to map %p", dest);
+
+	/*
+	 * Allocate a new element datum and add it to the map.
+	 */
+
+	avro_schema_t  map_schema = avro_datum_get_schema(dest);
+	avro_schema_t  value_schema = avro_schema_map_values(map_schema);
+	avro_datum_t  value = avro_datum_from_schema(value_schema);
+	avro_map_set(dest, key, value);
+	avro_datum_decref(value);
+
+	/*
+	 * Return the consumer that we allocated to process the map's
+	 * children.
+	 */
+
+	*value_consumer = resolver->child_resolvers[0];
+	*value_user_data = value;
+	return 0;
+}
+
+static int
+try_map(avro_resolver_t **resolver,
+	avro_schema_t wschema, avro_schema_t rschema,
+	avro_schema_t root_rschema)
+{
+	/*
+	 * First verify that the reader is an map.
+	 */
+
+	if (!is_avro_map(rschema)) {
+		return 0;
+	}
+
+	/*
+	 * Array schemas have to have compatible element schemas to be
+	 * compatible themselves.  Try to create an avro_resolver_t to
+	 * check the compatibility.
+	 */
+
+	avro_schema_t  wvalues = avro_schema_map_values(wschema);
+	avro_schema_t  rvalues = avro_schema_map_values(rschema);
+
+	avro_consumer_t  *value_consumer = avro_resolver_new(wvalues, rvalues);
+	if (!value_consumer) {
+		avro_prefix_error("Map values aren't compatible: ");
+		return EINVAL;
+	}
+
+	/*
+	 * The two schemas are compatible, so go ahead and create a
+	 * GavroResolver for the map.  Store the value schema's
+	 * resolver into the child_resolvers field.
+	 */
+
+	*resolver = avro_resolver_create(wschema, root_rschema);
+	(*resolver)->num_children = 1;
+	(*resolver)->child_resolvers = avro_calloc(1, sizeof(avro_consumer_t *));
+	(*resolver)->child_resolvers[0] = value_consumer;
+	(*resolver)->parent.map_start_block = avro_resolver_map_start_block;
+	(*resolver)->parent.map_element = avro_resolver_map_element;
+
+	return 0;
+}
+
+
+/*-----------------------------------------------------------------------
+ * records
+ */
+
+static int
+avro_resolver_record_start(avro_consumer_t *consumer,
+			   void *user_data)
+{
+#if DEBUG_RESOLVER
+	avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+	avro_datum_t  ud_dest = user_data;
+	avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+	debug("Starting record at %p", dest);
+#else
+	AVRO_UNUSED(consumer);
+	AVRO_UNUSED(user_data);
+#endif
+
+	/*
+	 * TODO: Eventually, we'll fill in default values for the extra
+	 * reader fields here.
+	 */
+
+	return 0;
+}
+
+static int
+avro_resolver_record_field(avro_consumer_t *consumer,
+			   unsigned int index,
+			   avro_consumer_t **field_consumer,
+			   void **field_user_data,
+			   void *user_data)
+{
+	avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+	avro_datum_t  ud_dest = user_data;
+	avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+
+	const char  *field_name =
+	    avro_schema_record_field_name(consumer->schema, index);
+
+	/*
+	 * Grab the resolver for this field of the writer record.  If
+	 * it's NULL, this this field doesn't exist in the reader
+	 * record, and should be skipped.
+	 */
+
+	debug("Retrieving resolver for writer field %i (%s)",
+	      index, field_name);
+
+	if (!resolver->child_resolvers[index]) {
+		debug("Reader doesn't have field %s, skipping", field_name);
+		return 0;
+	}
+
+	/*
+	 * TODO: Once we can retrieve record fields by index (quickly),
+	 * use the index_mapping.
+	 */
+
+	avro_datum_t  field = NULL;
+	avro_record_get(dest, field_name, &field);
+
+	*field_consumer = resolver->child_resolvers[index];
+	*field_user_data = field;
+	return 0;
+}
+
+static int
+try_record(avro_resolver_t **resolver,
+	   avro_schema_t wschema, avro_schema_t rschema,
+	   avro_schema_t root_rschema)
+{
+	/*
+	 * First verify that the reader is also a record, and has the
+	 * same name as the writer.
+	 */
+
+	if (!is_avro_record(rschema)) {
+		return 0;
+	}
+
+	const char  *wname = avro_schema_name(wschema);
+	const char  *rname = avro_schema_name(rschema);
+
+	if (strcmp(wname, rname)) {
+		return 0;
+	}
+
+	/*
+	 * Categorize the fields in the record schemas.  Fields that are
+	 * only in the writer are ignored.  Fields that are only in the
+	 * reader raise a schema mismatch error, unless the field has a
+	 * default value.  Fields that are in both are resolved
+	 * recursively.
+	 *
+	 * The child_resolver array will contain an avro_resolver_t for
+	 * each field in the writer schema.  To build this array, we
+	 * loop through the fields of the reader schema.  If that field
+	 * is also in the writer schema, we resolve them recursively,
+	 * and store the resolver into the array.  If the field isn't in
+	 * the writer schema, we raise an error.  (TODO: Eventually,
+	 * we'll handle default values here.)  After this loop finishes,
+	 * any NULLs in the child_resolver array will represent fields
+	 * in the writer but not the reader; these fields will be
+	 * skipped when processing the input.
+	 */
+
+	size_t  wfields = avro_schema_record_size(wschema);
+	size_t  rfields = avro_schema_record_size(rschema);
+
+	debug("Checking writer record schema %s", wname);
+
+	avro_consumer_t  **child_resolvers =
+	    avro_calloc(wfields, sizeof(avro_consumer_t *));
+	int  *index_mapping = avro_calloc(wfields, sizeof(int));
+
+	unsigned int  ri;
+	for (ri = 0; ri < rfields; ri++) {
+		avro_schema_t  rfield =
+		    avro_schema_record_field_get_by_index(rschema, ri);
+		const char  *field_name =
+		    avro_schema_record_field_name(rschema, ri);
+
+		debug("Resolving reader record field %u (%s)", ri, field_name);
+
+		/*
+		 * See if this field is also in the writer schema.
+		 */
+
+		int  wi = avro_schema_record_field_get_index(wschema, field_name);
+
+		if (wi == -1) {
+			/*
+			 * This field isn't in the writer schema —
+			 * that's an error!  TODO: Handle default
+			 * values!
+			 */
+
+			debug("Field %s isn't in writer", field_name);
+			avro_set_error("Reader field %s doesn't appear in writer",
+				       field_name);
+			goto error;
+		}
+
+		/*
+		 * Try to recursively resolve the schemas for this
+		 * field.  If they're not compatible, that's an error.
+		 */
+
+		avro_schema_t  wfield =
+		    avro_schema_record_field_get_by_index(wschema, wi);
+		avro_consumer_t  *field_resolver = avro_resolver_new(wfield, rfield);
+
+		if (!field_resolver) {
+			avro_prefix_error("Field %s isn't compatible: ", field_name);
+			goto error;
+		}
+
+		/*
+		 * Save the details for this field.
+		 */
+
+		debug("Found match for field %s (%u in reader, %d in writer)",
+		      field_name, ri, wi);
+		child_resolvers[wi] = field_resolver;
+		index_mapping[wi] = ri;
+	}
+
+	/*
+	 * We might not have found matches for all of the writer fields,
+	 * but that's okay — any extras will be ignored.
+	 */
+
+	*resolver = avro_resolver_create(wschema, root_rschema);
+	(*resolver)->num_children = wfields;
+	(*resolver)->child_resolvers = child_resolvers;
+	(*resolver)->index_mapping = index_mapping;
+	(*resolver)->parent.record_start = avro_resolver_record_start;
+	(*resolver)->parent.record_field = avro_resolver_record_field;
+	return 0;
+
+error:
+	/*
+	 * Clean up any consumer we might have already created.
+	 */
+
+	{
+		unsigned int  i;
+		for (i = 0; i < wfields; i++) {
+			if (child_resolvers[i]) {
+				avro_consumer_free(child_resolvers[i]);
+			}
+		}
+	}
+
+	avro_free(child_resolvers, wfields * sizeof(avro_consumer_t *));
+	avro_free(index_mapping, wfields * sizeof(int));
+	return EINVAL;
+}
+
+
+/*-----------------------------------------------------------------------
+ * union
+ */
+
+static int
+avro_resolver_union_branch(avro_consumer_t *consumer,
+			   unsigned int discriminant,
+			   avro_consumer_t **branch_consumer,
+			   void **branch_user_data,
+			   void *user_data)
+{
+	avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+
+	/*
+	 * Grab the resolver for this branch of the writer union.  If
+	 * it's NULL, then this branch is incompatible with the reader.
+	 */
+
+	debug("Retrieving resolver for writer branch %u", discriminant);
+
+	if (!resolver->child_resolvers[discriminant]) {
+		avro_set_error("Writer union branch %u is incompatible "
+			       "with reader schema \"%s\"",
+			       discriminant, avro_schema_type_name(resolver->rschema));
+		return EINVAL;
+	}
+
+	/*
+	 * Return the branch's resolver.
+	 */
+
+	*branch_consumer = resolver->child_resolvers[discriminant];
+	*branch_user_data = user_data;
+	return 0;
+}
+
+static avro_consumer_t *
+try_union(avro_schema_t wschema, avro_schema_t rschema)
+{
+	/*
+	 * For a writer union, we recursively try to resolve each branch
+	 * against the reader schema.  This will work correctly whether
+	 * or not the reader is also a union — if the reader is a union,
+	 * then we'll resolve each (non-union) writer branch against the
+	 * reader union, which will be checked in our calls to
+	 * check_simple_writer below.  The net result is that we might
+	 * end up trying every combination of writer and reader
+	 * branches, when looking for compatible schemas.
+	 *
+	 * Regardless of what the reader schema is, for each writer
+	 * branch, we stash away the recursive avro_resolver_t into the
+	 * child_resolvers array.  A NULL entry in this array means that
+	 * that branch isn't compatible with the reader.  This isn't an
+	 * immediate schema resolution error, since we allow
+	 * incompatible branches in the types as long as that branch
+	 * never appears in the actual data.  We only return an error if
+	 * there are *no* branches that are compatible.
+	 */
+
+	size_t  num_branches = avro_schema_union_size(wschema);
+	debug("Checking %zu-branch writer union schema", num_branches);
+
+	avro_consumer_t  **child_resolvers =
+	    avro_calloc(num_branches, sizeof(avro_consumer_t *));
+	int  some_branch_compatible = 0;
+
+	unsigned int  i;
+	for (i = 0; i < num_branches; i++) {
+		avro_schema_t  branch_schema =
+		    avro_schema_union_branch(wschema, i);
+
+		debug("Resolving writer union branch %u (%s)",
+		      i, avro_schema_type_name(branch_schema));
+
+		/*
+		 * Try to recursively resolve this branch of the writer
+		 * union.  Don't raise an error if this fails — it's
+		 * okay for some of the branches to not be compatible
+		 * with the reader, as long as those branches never
+		 * appear in the input.
+		 */
+
+		child_resolvers[i] = avro_resolver_new(branch_schema, rschema);
+		if (child_resolvers[i]) {
+			debug("Found match for writer union branch %u", i);
+			some_branch_compatible = 1;
+		} else {
+			debug("No match for writer union branch %u", i);
+		}
+	}
+
+	/*
+	 * As long as there's at least one branch that's compatible with
+	 * the reader, then we consider this schema resolution a
+	 * success.
+	 */
+
+	if (!some_branch_compatible) {
+		debug("No writer union branches match");
+		avro_set_error("No branches in the writer are compatible "
+			       "with reader schema %s",
+			       avro_schema_type_name(rschema));
+		goto error;
+	}
+
+	avro_resolver_t  *resolver = avro_resolver_create(wschema, rschema);
+	resolver->num_children = num_branches;
+	resolver->child_resolvers = child_resolvers;
+	resolver->parent.union_branch = avro_resolver_union_branch;
+	return &resolver->parent;
+
+error:
+	/*
+	 * Clean up any consumer we might have already created.
+	 */
+
+	for (i = 0; i < num_branches; i++) {
+		if (child_resolvers[i]) {
+			avro_consumer_free(child_resolvers[i]);
+		}
+	}
+
+	avro_free(child_resolvers, num_branches * sizeof(avro_consumer_t *));
+	return NULL;
+}
+
+
+/*-----------------------------------------------------------------------
+ * schema type dispatcher
+ */
+
+avro_consumer_t *
+avro_resolver_new(avro_schema_t wschema, avro_schema_t rschema)
+{
+	check_param(NULL, is_avro_schema(wschema), "writer schema");
+	check_param(NULL, is_avro_schema(rschema), "reader schema");
+
+	switch (avro_typeof(wschema))
+	{
+		case AVRO_BOOLEAN:
+			check_simple_writer(wschema, rschema, boolean);
+			return NULL;
+
+		case AVRO_BYTES:
+			check_simple_writer(wschema, rschema, bytes);
+			return NULL;
+
+		case AVRO_DOUBLE:
+			check_simple_writer(wschema, rschema, double);
+			return NULL;
+
+		case AVRO_FLOAT:
+			check_simple_writer(wschema, rschema, float);
+			return NULL;
+
+		case AVRO_INT32:
+			check_simple_writer(wschema, rschema, int);
+			return NULL;
+
+		case AVRO_INT64:
+			check_simple_writer(wschema, rschema, long);
+			return NULL;
+
+		case AVRO_NULL:
+			check_simple_writer(wschema, rschema, null);
+			return NULL;
+
+		case AVRO_STRING:
+			check_simple_writer(wschema, rschema, string);
+			return NULL;
+
+		case AVRO_ARRAY:
+			check_simple_writer(wschema, rschema, array);
+			return NULL;
+
+		case AVRO_ENUM:
+			check_simple_writer(wschema, rschema, enum);
+			return NULL;
+
+		case AVRO_FIXED:
+			check_simple_writer(wschema, rschema, fixed);
+			return NULL;
+
+		case AVRO_MAP:
+			check_simple_writer(wschema, rschema, map);
+			return NULL;
+
+		case AVRO_RECORD:
+			check_simple_writer(wschema, rschema, record);
+			return NULL;
+
+		case AVRO_UNION:
+			return try_union(wschema, rschema);
+
+		default:
+			avro_set_error("Unknown schema type");
+			return NULL;
+	}
+
+	return NULL;
+}

Modified: avro/trunk/lang/c/src/schema.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/schema.c?rev=1074612&r1=1074611&r2=1074612&view=diff
==============================================================================
--- avro/trunk/lang/c/src/schema.c (original)
+++ avro/trunk/lang/c/src/schema.c Fri Feb 25 16:17:56 2011
@@ -346,6 +346,14 @@ avro_schema_union_append(const avro_sche
 	return 0;
 }
 
+size_t avro_schema_union_size(const avro_schema_t union_schema)
+{
+	check_param(EINVAL, is_avro_schema(union_schema), "union schema");
+	check_param(EINVAL, is_avro_union(union_schema), "union schema");
+	struct avro_union_schema_t *unionp = avro_schema_to_union(union_schema);
+	return unionp->branches->num_entries;
+}
+
 avro_schema_t avro_schema_union_branch(avro_schema_t unionp,
 				       int branch_index)
 {
@@ -532,6 +540,7 @@ avro_schema_record_field_append(const av
 		avro_set_error("Cannot allocate new record field");
 		return ENOMEM;
 	}
+	new_field->index = record->fields->num_entries;
 	new_field->name = avro_strdup(field_name);
 	new_field->type = avro_schema_incref(field_schema);
 	st_insert(record->fields, record->fields->num_entries,
@@ -609,6 +618,22 @@ avro_schema_t avro_schema_record_field_g
 	return val.field->type;
 }
 
+int avro_schema_record_field_get_index(const avro_schema_t schema,
+				       const char *field_name)
+{
+	union {
+		st_data_t data;
+		struct avro_record_field_t *field;
+	} val;
+	if (st_lookup(avro_schema_to_record(schema)->fields_byname,
+		      (st_data_t) field_name, &val.data)) {
+		return val.field->index;
+	}
+
+	avro_set_error("No field named %s in record", field_name);
+	return -1;
+}
+
 const char *avro_schema_record_field_name(const avro_schema_t schema, int index)
 {
 	union {

Modified: avro/trunk/lang/c/src/schema.h
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/schema.h?rev=1074612&r1=1074611&r2=1074612&view=diff
==============================================================================
--- avro/trunk/lang/c/src/schema.h (original)
+++ avro/trunk/lang/c/src/schema.h Fri Feb 25 16:17:56 2011
@@ -22,6 +22,7 @@
 #include "st.h"
 
 struct avro_record_field_t {
+	int index;
 	char *name;
 	avro_schema_t type;
 	/*

Modified: avro/trunk/lang/c/tests/test_avro_data.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/tests/test_avro_data.c?rev=1074612&r1=1074611&r2=1074612&view=diff
==============================================================================
--- avro/trunk/lang/c/tests/test_avro_data.c (original)
+++ avro/trunk/lang/c/tests/test_avro_data.c Fri Feb 25 16:17:56 2011
@@ -114,6 +114,7 @@ write_read_check(avro_schema_t writers_s
 		    (reader, writers_schema, readers_schema, &datum_out)) {
 			fprintf(stderr, "Unable to read %s validate=%d\n", type,
 				validate);
+			fprintf(stderr, "  %s\n", avro_strerror());
 			exit(EXIT_FAILURE);
 		}
 		if (!avro_datum_equal(datum, datum_out)) {
@@ -263,9 +264,9 @@ static int test_double(void)
 static int test_float(void)
 {
 	int i;
-	avro_schema_t schema = avro_schema_double();
+	avro_schema_t schema = avro_schema_float();
 	for (i = 0; i < 100; i++) {
-		avro_datum_t datum = avro_double(rand_number(-1.0E10, 1.0E10));
+		avro_datum_t datum = avro_float(rand_number(-1.0E10, 1.0E10));
 		write_read_check(schema, NULL, datum, "float");
 		avro_datum_decref(datum);
 	}
@@ -306,12 +307,12 @@ static int test_null(void)
 static int test_record(void)
 {
 	avro_schema_t schema = avro_schema_record("person", NULL);
-	avro_datum_t datum = avro_record(schema);
-	avro_datum_t name_datum, age_datum;
-
 	avro_schema_record_field_append(schema, "name", avro_schema_string());
 	avro_schema_record_field_append(schema, "age", avro_schema_int());
 
+	avro_datum_t datum = avro_record(schema);
+	avro_datum_t name_datum, age_datum;
+
 	name_datum = avro_givestring("Joseph Campbell", NULL);
 	age_datum = avro_int32(83);
 

Modified: avro/trunk/lang/c/tests/test_avro_schema.c
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/tests/test_avro_schema.c?rev=1074612&r1=1074611&r2=1074612&view=diff
==============================================================================
--- avro/trunk/lang/c/tests/test_avro_schema.c (original)
+++ avro/trunk/lang/c/tests/test_avro_schema.c Fri Feb 25 16:17:56 2011
@@ -180,6 +180,16 @@ static int test_record(void)
 	avro_schema_record_field_append(schema, "name", avro_schema_string());
 	avro_schema_record_field_append(schema, "age", avro_schema_int());
 
+	if (avro_schema_record_field_get_index(schema, "name") != 0) {
+		fprintf(stderr, "Incorrect index for \"name\" field\n");
+		exit(EXIT_FAILURE);
+	}
+
+	if (avro_schema_record_field_get_index(schema, "unknown") != -1) {
+		fprintf(stderr, "Incorrect index for \"unknown\" field\n");
+		exit(EXIT_FAILURE);
+	}
+
 	avro_schema_t  name_field =
 		avro_schema_record_field_get(schema, "name");
 	if (!avro_schema_equal(name_field, avro_schema_string())) {



Mime
View raw message