Return-Path: X-Original-To: apmail-avro-commits-archive@www.apache.org Delivered-To: apmail-avro-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E6AFA7A94 for ; Mon, 26 Sep 2011 12:36:53 +0000 (UTC) Received: (qmail 13801 invoked by uid 500); 26 Sep 2011 12:36:53 -0000 Delivered-To: apmail-avro-commits-archive@avro.apache.org Received: (qmail 13772 invoked by uid 500); 26 Sep 2011 12:36:53 -0000 Mailing-List: contact commits-help@avro.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@avro.apache.org Delivered-To: mailing list commits@avro.apache.org Received: (qmail 13765 invoked by uid 99); 26 Sep 2011 12:36:53 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Sep 2011 12:36:53 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Sep 2011 12:36:49 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id F301C2388ABB for ; Mon, 26 Sep 2011 12:36:28 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: svn commit: r1175832 [4/5] - in /avro/trunk: ./ lang/c/docs/ lang/c/examples/ lang/c/src/ lang/c/src/avro/ lang/c/tests/ Date: Mon, 26 Sep 2011 12:36:27 -0000 To: commits@avro.apache.org From: dcreager@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20110926123628.F301C2388ABB@eris.apache.org> Added: avro/trunk/lang/c/src/resolved-writer.c URL: http://svn.apache.org/viewvc/avro/trunk/lang/c/src/resolved-writer.c?rev=1175832&view=auto ============================================================================== --- avro/trunk/lang/c/src/resolved-writer.c (added) +++ avro/trunk/lang/c/src/resolved-writer.c Mon Sep 26 12:36:26 2011 @@ -0,0 +1,2895 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +#include +#include +#include +#include + +#include "avro_private.h" +#include "avro/allocation.h" +#include "avro/basics.h" +#include "avro/data.h" +#include "avro/errors.h" +#include "avro/refcount.h" +#include "avro/resolver.h" +#include "avro/schema.h" +#include "avro/value.h" +#include "st.h" + +#ifndef AVRO_RESOLVER_DEBUG +#define AVRO_RESOLVER_DEBUG 0 +#endif + +#if AVRO_RESOLVER_DEBUG +#include +#define DEBUG(...) \ + do { \ + fprintf(stderr, __VA_ARGS__); \ + fprintf(stderr, "\n"); \ + } while (0) +#else +#define DEBUG(...) /* don't print messages */ +#endif + + +typedef struct avro_resolved_writer avro_resolved_writer_t; + +struct avro_resolved_writer { + avro_value_iface_t parent; + + /** The reference count for this interface. */ + volatile int refcount; + + /** The writer schema. */ + avro_schema_t wschema; + + /** The reader schema. */ + avro_schema_t rschema; + + /* If the reader schema is a union, but the writer schema is + * not, this field indicates which branch of the reader union + * should be selected. */ + int reader_union_branch; + + /* The size of the value instances for this resolver. */ + size_t instance_size; + + /* A function to calculate the instance size once the overall + * top-level resolver (and all of its children) have been + * constructed. */ + void + (*calculate_size)(avro_resolved_writer_t *iface); + + /* A free function for this resolver interface */ + void + (*free_iface)(avro_resolved_writer_t *iface, st_table *freeing); + + /* An initialization function for instances of this resolver. */ + int + (*init)(const avro_resolved_writer_t *iface, void *self); + + /* A finalization function for instances of this resolver. */ + void + (*done)(const avro_resolved_writer_t *iface, void *self); + + /* Clear out any existing wrappers, if any */ + int + (*reset_wrappers)(const avro_resolved_writer_t *iface, void *self); +}; + +#define avro_resolved_writer_calculate_size(iface) \ + do { \ + if ((iface)->calculate_size != NULL) { \ + (iface)->calculate_size((iface)); \ + } \ + } while (0) +#define avro_resolved_writer_init(iface, self) \ + ((iface)->init == NULL? 0: (iface)->init((iface), (self))) +#define avro_resolved_writer_done(iface, self) \ + ((iface)->done == NULL? (void) 0: (iface)->done((iface), (self))) +#define avro_resolved_writer_reset_wrappers(iface, self) \ + ((iface)->reset_wrappers == NULL? 0: \ + (iface)->reset_wrappers((iface), (self))) + + +/* + * We assume that each instance type in this value contains an an + * avro_value_t as its first element, which is the current wrapped + * value. + */ + +void +avro_resolved_writer_set_dest(avro_value_t *resolved, + avro_value_t *dest) +{ + avro_value_t *self = resolved->self; + if (self->self != NULL) { + avro_value_decref(self); + } + avro_value_copy_ref(self, dest); +} + +void +avro_resolved_writer_clear_dest(avro_value_t *resolved) +{ + avro_value_t *self = resolved->self; + if (self->self != NULL) { + avro_value_decref(self); + } + self->iface = NULL; + self->self = NULL; +} + +int +avro_resolved_writer_new_value(avro_value_iface_t *viface, + avro_value_t *value) +{ + int rval; + avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + void *self = avro_malloc(iface->instance_size + sizeof(volatile int)); + if (self == NULL) { + value->iface = NULL; + value->self = NULL; + return ENOMEM; + } + + memset(self, 0, iface->instance_size + sizeof(volatile int)); + volatile int *refcount = self; + self += sizeof(volatile int); + + rval = avro_resolved_writer_init(iface, self); + if (rval != 0) { + avro_free(self, iface->instance_size + sizeof(volatile int)); + value->iface = NULL; + value->self = NULL; + return rval; + } + + *refcount = 1; + value->iface = avro_value_iface_incref(viface); + value->self = self; + return 0; +} + +static void +avro_resolved_writer_free_value(const avro_value_iface_t *viface, void *vself) +{ + avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + + avro_resolved_writer_done(iface, vself); + if (self->self != NULL) { + avro_value_decref(self); + } + + vself -= sizeof(volatile int); + avro_free(vself, iface->instance_size + sizeof(volatile int)); +} + +static void +avro_resolved_writer_incref(avro_value_t *value) +{ + /* + * This only works if you pass in the top-level value. + */ + + volatile int *refcount = (value->self - sizeof(volatile int)); + avro_refcount_inc(refcount); +} + +static void +avro_resolved_writer_decref(avro_value_t *value) +{ + /* + * This only works if you pass in the top-level value. + */ + + volatile int *refcount = (value->self - sizeof(volatile int)); + if (avro_refcount_dec(refcount)) { + avro_resolved_writer_free_value(value->iface, value->self); + } +} + + +static avro_value_iface_t * +avro_resolved_writer_incref_iface(avro_value_iface_t *viface) +{ + avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_refcount_inc(&iface->refcount); + return viface; +} + +static void +free_resolver(avro_resolved_writer_t *iface, st_table *freeing) +{ + /* First check if we've already started freeing this resolver. */ + if (st_lookup(freeing, (st_data_t) iface, NULL)) { + DEBUG("Already freed %p", iface); + return; + } + + /* Otherwise add this resolver to the freeing set, then free it. */ + st_insert(freeing, (st_data_t) iface, (st_data_t) NULL); + DEBUG("Freeing resolver %p (%s->%s)", iface, + avro_schema_type_name(iface->wschema), + avro_schema_type_name(iface->rschema)); + + iface->free_iface(iface, freeing); +} + +static void +avro_resolved_writer_calculate_size_(avro_resolved_writer_t *iface) +{ + /* Only calculate the size for any resolver once */ + iface->calculate_size = NULL; + + DEBUG("Calculating size for %s->%s", + avro_schema_type_name((iface)->wschema), + avro_schema_type_name((iface)->rschema)); + iface->instance_size = sizeof(avro_value_t); +} + +static void +avro_resolved_writer_free_iface(avro_resolved_writer_t *iface, st_table *freeing) +{ + AVRO_UNUSED(freeing); + avro_schema_decref(iface->wschema); + avro_schema_decref(iface->rschema); + avro_freet(avro_resolved_writer_t, iface); +} + +static void +avro_resolved_writer_decref_iface(avro_value_iface_t *viface) +{ + avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + DEBUG("Decref resolver %p (before=%d)", iface, iface->refcount); + if (avro_refcount_dec(&iface->refcount)) { + avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + + st_table *freeing = st_init_numtable(); + free_resolver(iface, freeing); + st_free_table(freeing); + } +} + + +static int +avro_resolved_writer_reset(const avro_value_iface_t *viface, void *vself) +{ + /* + * To reset a wrapped value, we first clear out any wrappers, + * and then have the wrapped value reset itself. + */ + + int rval; + avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + check(rval, avro_resolved_writer_reset_wrappers(iface, vself)); + return avro_value_reset(self); +} + +static avro_type_t +avro_resolved_writer_get_type(const avro_value_iface_t *viface, const void *vself) +{ + AVRO_UNUSED(vself); + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + return avro_typeof(iface->wschema); +} + +static avro_schema_t +avro_resolved_writer_get_schema(const avro_value_iface_t *viface, const void *vself) +{ + AVRO_UNUSED(vself); + avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + return iface->wschema; +} + + +static avro_resolved_writer_t * +avro_resolved_writer_create(avro_schema_t wschema, avro_schema_t rschema) +{ + avro_resolved_writer_t *self = avro_new(avro_resolved_writer_t); + memset(self, 0, sizeof(avro_resolved_writer_t)); + + self->parent.incref_iface = avro_resolved_writer_incref_iface; + self->parent.decref_iface = avro_resolved_writer_decref_iface; + self->parent.incref = avro_resolved_writer_incref; + self->parent.decref = avro_resolved_writer_decref; + self->parent.reset = avro_resolved_writer_reset; + self->parent.get_type = avro_resolved_writer_get_type; + self->parent.get_schema = avro_resolved_writer_get_schema; + + self->refcount = 1; + self->wschema = avro_schema_incref(wschema); + self->rschema = avro_schema_incref(rschema); + self->reader_union_branch = -1; + self->calculate_size = avro_resolved_writer_calculate_size_; + self->free_iface = avro_resolved_writer_free_iface; + self->reset_wrappers = NULL; + return self; +} + +static inline int +avro_resolved_writer_get_real_dest(const avro_resolved_writer_t *iface, + const avro_value_t *dest, avro_value_t *real_dest) +{ + if (iface->reader_union_branch < 0) { + /* + * The reader schema isn't a union, so use the dest + * field as-is. + */ + + *real_dest = *dest; + return 0; + } + + DEBUG("Retrieving union branch %d for %s value", + iface->reader_union_branch, + avro_schema_type_name(iface->wschema)); + + return avro_value_set_branch(dest, iface->reader_union_branch, real_dest); +} + + +#define skip_links(schema) \ + while (is_avro_link(schema)) { \ + schema = avro_schema_link_target(schema); \ + } + + +/*----------------------------------------------------------------------- + * Memoized resolvers + */ + +typedef struct avro_resolved_link_writer avro_resolved_link_writer_t; + +typedef struct memoize_state_t { + avro_memoize_t mem; + avro_resolved_link_writer_t *links; +} memoize_state_t; + +static avro_resolved_writer_t * +avro_resolved_writer_new_memoized(memoize_state_t *state, + avro_schema_t wschema, avro_schema_t rschema); + + +/*----------------------------------------------------------------------- + * Reader unions + */ + +/* + * For each Avro type, we have to check whether the reader schema on its + * own is compatible, and also whether the reader is a union that + * contains a compatible type. The macros in this section help us + * perform both of these checks with less code. + */ + + +/** + * A helper macro that handles the case where neither writer nor reader + * are unions. Uses @ref check_func to see if the two schemas are + * compatible. + */ + +#define check_non_union(saved, wschema, rschema, check_func) \ +do { \ + avro_resolved_writer_t *self = NULL; \ + int rc = check_func(saved, &self, wschema, rschema, \ + rschema); \ + if (self) { \ + DEBUG("Non-union schemas %s (writer) " \ + "and %s (reader) match", \ + avro_schema_type_name(wschema), \ + avro_schema_type_name(rschema)); \ + \ + self->reader_union_branch = -1; \ + return self; \ + } \ + \ + if (rc) { \ + return NULL; \ + } \ +} while (0) + + +/** + * Helper macro that handles the case where the reader is a union, and + * the writer is not. Checks each branch of the reader union schema, + * looking for the first branch that is compatible with the writer + * schema. The @ref check_func argument should be a function that can + * check the compatiblity of each branch schema. + */ + +#define check_reader_union(saved, wschema, rschema, check_func) \ +do { \ + if (!is_avro_union(rschema)) { \ + break; \ + } \ + \ + DEBUG("Checking reader union schema"); \ + size_t num_branches = avro_schema_union_size(rschema); \ + unsigned int i; \ + \ + for (i = 0; i < num_branches; i++) { \ + avro_schema_t branch_schema = \ + avro_schema_union_branch(rschema, i); \ + skip_links(branch_schema); \ + \ + DEBUG("Trying branch %u %s%s%s->%s", i, \ + is_avro_link(wschema)? "[": "", \ + avro_schema_type_name(wschema), \ + is_avro_link(wschema)? "]": "", \ + avro_schema_type_name(branch_schema)); \ + \ + avro_resolved_writer_t *self = NULL; \ + int rc = check_func(saved, &self, \ + wschema, branch_schema, rschema); \ + if (self) { \ + DEBUG("Reader union branch %d (%s) " \ + "and writer %s match", \ + i, avro_schema_type_name(branch_schema), \ + avro_schema_type_name(wschema)); \ + self->reader_union_branch = i; \ + return self; \ + } else { \ + DEBUG("Reader union branch %d (%s) " \ + "doesn't match", \ + i, avro_schema_type_name(branch_schema)); \ + } \ + \ + if (rc) { \ + return NULL; \ + } \ + } \ + \ + DEBUG("No reader union branches match"); \ +} while (0) + +/** + * A helper macro that wraps together check_non_union and + * check_reader_union for a simple (non-union) writer schema type. + */ + +#define check_simple_writer(saved, wschema, rschema, type_name) \ +do { \ + check_non_union(saved, wschema, rschema, try_##type_name); \ + check_reader_union(saved, wschema, rschema, try_##type_name); \ + DEBUG("Writer %s doesn't match reader %s", \ + avro_schema_type_name(wschema), \ + avro_schema_type_name(rschema)); \ + avro_set_error("Cannot store " #type_name " into %s", \ + avro_schema_type_name(rschema)); \ + return NULL; \ +} while (0) + + +/*----------------------------------------------------------------------- + * Recursive schemas + */ + +/* + * Recursive schemas are handled specially; the value implementation for + * an AVRO_LINK schema is simply a wrapper around the value + * implementation for the link's target schema. The value methods all + * delegate to the wrapped implementation. + */ + +struct avro_resolved_link_writer { + avro_resolved_writer_t parent; + + /** + * A pointer to the “next” link resolver that we've had to + * create. We use this as we're creating the overall top-level + * resolver to keep track of which ones we have to fix up + * afterwards. + */ + avro_resolved_link_writer_t *next; + + /** The target's implementation. */ + avro_resolved_writer_t *target_resolver; +}; + +typedef struct avro_resolved_link_value { + avro_value_t wrapped; + avro_value_t target; +} avro_resolved_link_value_t; + +static void +avro_resolved_link_writer_calculate_size(avro_resolved_writer_t *iface) +{ + /* Only calculate the size for any resolver once */ + iface->calculate_size = NULL; + + DEBUG("Calculating size for [%s]->%s", + avro_schema_type_name((iface)->wschema), + avro_schema_type_name((iface)->rschema)); + iface->instance_size = sizeof(avro_resolved_link_value_t); +} + +static void +avro_resolved_link_writer_free_iface(avro_resolved_writer_t *iface, st_table *freeing) +{ + avro_resolved_link_writer_t *liface = + container_of(iface, avro_resolved_link_writer_t, parent); + if (liface->target_resolver != NULL) { + free_resolver(liface->target_resolver, freeing); + } + avro_schema_decref(iface->wschema); + avro_schema_decref(iface->rschema); + avro_freet(avro_resolved_link_writer_t, iface); +} + +static int +avro_resolved_link_writer_init(const avro_resolved_writer_t *iface, void *vself) +{ + int rval; + const avro_resolved_link_writer_t *liface = + container_of(iface, avro_resolved_link_writer_t, parent); + avro_resolved_link_value_t *self = vself; + size_t target_instance_size = liface->target_resolver->instance_size; + + self->target.iface = &liface->target_resolver->parent; + self->target.self = avro_malloc(target_instance_size); + if (self->target.self == NULL) { + return ENOMEM; + } + DEBUG("Allocated <%p:%zu> for link", self->target.self, target_instance_size); + + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + + rval = avro_resolved_writer_init(liface->target_resolver, self->target.self); + if (rval != 0) { + avro_free(self->target.self, target_instance_size); + } + return rval; +} + +static void +avro_resolved_link_writer_done(const avro_resolved_writer_t *iface, void *vself) +{ + const avro_resolved_link_writer_t *liface = + container_of(iface, avro_resolved_link_writer_t, parent); + avro_resolved_link_value_t *self = vself; + size_t target_instance_size = liface->target_resolver->instance_size; + DEBUG("Freeing <%p:%zu> for link", self->target.self, target_instance_size); + avro_resolved_writer_done(liface->target_resolver, self->target.self); + avro_free(self->target.self, target_instance_size); + self->target.iface = NULL; + self->target.self = NULL; +} + +static int +avro_resolved_link_writer_reset(const avro_resolved_writer_t *iface, void *vself) +{ + const avro_resolved_link_writer_t *liface = + container_of(iface, avro_resolved_link_writer_t, parent); + avro_resolved_link_value_t *self = vself; + return avro_resolved_writer_reset_wrappers + (liface->target_resolver, self->target.self); +} + +static avro_type_t +avro_resolved_link_writer_get_type(const avro_value_iface_t *iface, const void *vself) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_type(&self->target); +} + +static avro_schema_t +avro_resolved_link_writer_get_schema(const avro_value_iface_t *iface, const void *vself) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_schema(&self->target); +} + +static int +avro_resolved_link_writer_get_boolean(const avro_value_iface_t *iface, + const void *vself, int *out) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_boolean(&self->target, out); +} + +static int +avro_resolved_link_writer_get_bytes(const avro_value_iface_t *iface, + const void *vself, const void **buf, size_t *size) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_bytes(&self->target, buf, size); +} + +static int +avro_resolved_link_writer_grab_bytes(const avro_value_iface_t *iface, + const void *vself, avro_wrapped_buffer_t *dest) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_grab_bytes(&self->target, dest); +} + +static int +avro_resolved_link_writer_get_double(const avro_value_iface_t *iface, + const void *vself, double *out) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_double(&self->target, out); +} + +static int +avro_resolved_link_writer_get_float(const avro_value_iface_t *iface, + const void *vself, float *out) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_float(&self->target, out); +} + +static int +avro_resolved_link_writer_get_int(const avro_value_iface_t *iface, + const void *vself, int32_t *out) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_int(&self->target, out); +} + +static int +avro_resolved_link_writer_get_long(const avro_value_iface_t *iface, + const void *vself, int64_t *out) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_long(&self->target, out); +} + +static int +avro_resolved_link_writer_get_null(const avro_value_iface_t *iface, const void *vself) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_null(&self->target); +} + +static int +avro_resolved_link_writer_get_string(const avro_value_iface_t *iface, + const void *vself, const char **str, size_t *size) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_string(&self->target, str, size); +} + +static int +avro_resolved_link_writer_grab_string(const avro_value_iface_t *iface, + const void *vself, avro_wrapped_buffer_t *dest) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_grab_string(&self->target, dest); +} + +static int +avro_resolved_link_writer_get_enum(const avro_value_iface_t *iface, + const void *vself, int *out) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_enum(&self->target, out); +} + +static int +avro_resolved_link_writer_get_fixed(const avro_value_iface_t *iface, + const void *vself, const void **buf, size_t *size) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_fixed(&self->target, buf, size); +} + +static int +avro_resolved_link_writer_grab_fixed(const avro_value_iface_t *iface, + const void *vself, avro_wrapped_buffer_t *dest) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_grab_fixed(&self->target, dest); +} + +static int +avro_resolved_link_writer_set_boolean(const avro_value_iface_t *iface, + void *vself, int val) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_set_boolean(&self->target, val); +} + +static int +avro_resolved_link_writer_set_bytes(const avro_value_iface_t *iface, + void *vself, void *buf, size_t size) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_set_bytes(&self->target, buf, size); +} + +static int +avro_resolved_link_writer_give_bytes(const avro_value_iface_t *iface, + void *vself, avro_wrapped_buffer_t *buf) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_give_bytes(&self->target, buf); +} + +static int +avro_resolved_link_writer_set_double(const avro_value_iface_t *iface, + void *vself, double val) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_set_double(&self->target, val); +} + +static int +avro_resolved_link_writer_set_float(const avro_value_iface_t *iface, + void *vself, float val) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_set_float(&self->target, val); +} + +static int +avro_resolved_link_writer_set_int(const avro_value_iface_t *iface, + void *vself, int32_t val) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_set_int(&self->target, val); +} + +static int +avro_resolved_link_writer_set_long(const avro_value_iface_t *iface, + void *vself, int64_t val) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_set_long(&self->target, val); +} + +static int +avro_resolved_link_writer_set_null(const avro_value_iface_t *iface, void *vself) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_set_null(&self->target); +} + +static int +avro_resolved_link_writer_set_string(const avro_value_iface_t *iface, + void *vself, char *str) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_set_string(&self->target, str); +} + +static int +avro_resolved_link_writer_set_string_len(const avro_value_iface_t *iface, + void *vself, char *str, size_t size) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_set_string_len(&self->target, str, size); +} + +static int +avro_resolved_link_writer_give_string_len(const avro_value_iface_t *iface, + void *vself, avro_wrapped_buffer_t *buf) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_give_string_len(&self->target, buf); +} + +static int +avro_resolved_link_writer_set_enum(const avro_value_iface_t *iface, + void *vself, int val) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_set_enum(&self->target, val); +} + +static int +avro_resolved_link_writer_set_fixed(const avro_value_iface_t *iface, + void *vself, void *buf, size_t size) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_set_fixed(&self->target, buf, size); +} + +static int +avro_resolved_link_writer_give_fixed(const avro_value_iface_t *iface, + void *vself, avro_wrapped_buffer_t *buf) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_give_fixed(&self->target, buf); +} + +static int +avro_resolved_link_writer_get_size(const avro_value_iface_t *iface, + const void *vself, size_t *size) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_size(&self->target, size); +} + +static int +avro_resolved_link_writer_get_by_index(const avro_value_iface_t *iface, + const void *vself, size_t index, + avro_value_t *child, const char **name) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_by_index(&self->target, index, child, name); +} + +static int +avro_resolved_link_writer_get_by_name(const avro_value_iface_t *iface, + const void *vself, const char *name, + avro_value_t *child, size_t *index) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_by_name(&self->target, name, child, index); +} + +static int +avro_resolved_link_writer_get_discriminant(const avro_value_iface_t *iface, + const void *vself, int *out) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_discriminant(&self->target, out); +} + +static int +avro_resolved_link_writer_get_current_branch(const avro_value_iface_t *iface, + const void *vself, avro_value_t *branch) +{ + AVRO_UNUSED(iface); + const avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_get_current_branch(&self->target, branch); +} + +static int +avro_resolved_link_writer_append(const avro_value_iface_t *iface, + void *vself, avro_value_t *child_out, + size_t *new_index) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_append(&self->target, child_out, new_index); +} + +static int +avro_resolved_link_writer_add(const avro_value_iface_t *iface, + void *vself, const char *key, + avro_value_t *child, size_t *index, int *is_new) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_add(&self->target, key, child, index, is_new); +} + +static int +avro_resolved_link_writer_set_branch(const avro_value_iface_t *iface, + void *vself, int discriminant, + avro_value_t *branch) +{ + AVRO_UNUSED(iface); + avro_resolved_link_value_t *self = vself; + avro_value_t *target_vself = self->target.self; + *target_vself = self->wrapped; + return avro_value_set_branch(&self->target, discriminant, branch); +} + +static avro_resolved_link_writer_t * +avro_resolved_link_writer_create(avro_schema_t wschema, avro_schema_t rschema) +{ + avro_resolved_writer_t *self = avro_new(avro_resolved_link_writer_t); + memset(self, 0, sizeof(avro_resolved_link_writer_t)); + + self->parent.incref_iface = avro_resolved_writer_incref_iface; + self->parent.decref_iface = avro_resolved_writer_decref_iface; + self->parent.incref = avro_resolved_writer_incref; + self->parent.decref = avro_resolved_writer_decref; + self->parent.reset = avro_resolved_writer_reset; + self->parent.get_type = avro_resolved_link_writer_get_type; + self->parent.get_schema = avro_resolved_link_writer_get_schema; + self->parent.get_size = avro_resolved_link_writer_get_size; + self->parent.get_by_index = avro_resolved_link_writer_get_by_index; + self->parent.get_by_name = avro_resolved_link_writer_get_by_name; + + self->refcount = 1; + self->wschema = avro_schema_incref(wschema); + self->rschema = avro_schema_incref(rschema); + self->reader_union_branch = -1; + self->calculate_size = avro_resolved_link_writer_calculate_size; + self->free_iface = avro_resolved_link_writer_free_iface; + self->init = avro_resolved_link_writer_init; + self->done = avro_resolved_link_writer_done; + self->reset_wrappers = avro_resolved_link_writer_reset; + + self->parent.get_boolean = avro_resolved_link_writer_get_boolean; + self->parent.get_bytes = avro_resolved_link_writer_get_bytes; + self->parent.grab_bytes = avro_resolved_link_writer_grab_bytes; + self->parent.get_double = avro_resolved_link_writer_get_double; + self->parent.get_float = avro_resolved_link_writer_get_float; + self->parent.get_int = avro_resolved_link_writer_get_int; + self->parent.get_long = avro_resolved_link_writer_get_long; + self->parent.get_null = avro_resolved_link_writer_get_null; + self->parent.get_string = avro_resolved_link_writer_get_string; + self->parent.grab_string = avro_resolved_link_writer_grab_string; + self->parent.get_enum = avro_resolved_link_writer_get_enum; + self->parent.get_fixed = avro_resolved_link_writer_get_fixed; + self->parent.grab_fixed = avro_resolved_link_writer_grab_fixed; + + self->parent.set_boolean = avro_resolved_link_writer_set_boolean; + self->parent.set_bytes = avro_resolved_link_writer_set_bytes; + self->parent.give_bytes = avro_resolved_link_writer_give_bytes; + self->parent.set_double = avro_resolved_link_writer_set_double; + self->parent.set_float = avro_resolved_link_writer_set_float; + self->parent.set_int = avro_resolved_link_writer_set_int; + self->parent.set_long = avro_resolved_link_writer_set_long; + self->parent.set_null = avro_resolved_link_writer_set_null; + self->parent.set_string = avro_resolved_link_writer_set_string; + self->parent.set_string_len = avro_resolved_link_writer_set_string_len; + self->parent.give_string_len = avro_resolved_link_writer_give_string_len; + self->parent.set_enum = avro_resolved_link_writer_set_enum; + self->parent.set_fixed = avro_resolved_link_writer_set_fixed; + self->parent.give_fixed = avro_resolved_link_writer_give_fixed; + + self->parent.get_size = avro_resolved_link_writer_get_size; + self->parent.get_by_index = avro_resolved_link_writer_get_by_index; + self->parent.get_by_name = avro_resolved_link_writer_get_by_name; + self->parent.get_discriminant = avro_resolved_link_writer_get_discriminant; + self->parent.get_current_branch = avro_resolved_link_writer_get_current_branch; + + self->parent.append = avro_resolved_link_writer_append; + self->parent.add = avro_resolved_link_writer_add; + self->parent.set_branch = avro_resolved_link_writer_set_branch; + + return container_of(self, avro_resolved_link_writer_t, parent); +} + +static int +try_link(memoize_state_t *state, avro_resolved_writer_t **self, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + AVRO_UNUSED(rschema); + + /* + * For link schemas, we create a special value implementation + * that allocates space for its wrapped value at runtime. This + * lets us handle recursive types without having to instantiate + * in infinite-size value. + */ + + avro_schema_t wtarget = avro_schema_link_target(wschema); + avro_resolved_link_writer_t *lself = + avro_resolved_link_writer_create(wtarget, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, lself); + + avro_resolved_writer_t *target_resolver = + avro_resolved_writer_new_memoized(state, wtarget, rschema); + if (target_resolver == NULL) { + avro_memoize_delete(&state->mem, wschema, root_rschema); + avro_value_iface_decref(&lself->parent.parent); + avro_prefix_error("Link target isn't compatible: "); + DEBUG("%s", avro_strerror()); + return EINVAL; + } + + lself->target_resolver = target_resolver; + lself->next = state->links; + state->links = lself; + + *self = &lself->parent; + return 0; +} + + +/*----------------------------------------------------------------------- + * boolean + */ + +static int +avro_resolved_writer_set_boolean(const avro_value_iface_t *viface, + void *vself, int val) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Storing %s into %p", val? "TRUE": "FALSE", dest.self); + return avro_value_set_boolean(&dest, val); +} + +static int +try_boolean(memoize_state_t *state, avro_resolved_writer_t **self, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + if (is_avro_boolean(rschema)) { + *self = avro_resolved_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, *self); + (*self)->parent.set_boolean = avro_resolved_writer_set_boolean; + } + return 0; +} + + +/*----------------------------------------------------------------------- + * bytes + */ + +static int +avro_resolved_writer_set_bytes(const avro_value_iface_t *viface, + void *vself, void *buf, size_t size) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Storing <%p:%zu> into %p", buf, size, dest.self); + return avro_value_set_bytes(&dest, buf, size); +} + +static int +avro_resolved_writer_give_bytes(const avro_value_iface_t *viface, + void *vself, avro_wrapped_buffer_t *buf) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Storing [%p] into %p", buf, dest.self); + return avro_value_give_bytes(&dest, buf); +} + +static int +try_bytes(memoize_state_t *state, avro_resolved_writer_t **self, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + if (is_avro_bytes(rschema)) { + *self = avro_resolved_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, *self); + (*self)->parent.set_bytes = avro_resolved_writer_set_bytes; + (*self)->parent.give_bytes = avro_resolved_writer_give_bytes; + } + return 0; +} + + +/*----------------------------------------------------------------------- + * double + */ + +static int +avro_resolved_writer_set_double(const avro_value_iface_t *viface, + void *vself, double val) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Storing %le into %p", val, dest.self); + return avro_value_set_double(&dest, val); +} + +static int +try_double(memoize_state_t *state, avro_resolved_writer_t **self, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + if (is_avro_double(rschema)) { + *self = avro_resolved_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, *self); + (*self)->parent.set_double = avro_resolved_writer_set_double; + } + return 0; +} + + +/*----------------------------------------------------------------------- + * float + */ + +static int +avro_resolved_writer_set_float(const avro_value_iface_t *viface, + void *vself, float val) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Storing %e into %p", val, dest.self); + return avro_value_set_float(&dest, val); +} + +static int +avro_resolved_writer_set_float_double(const avro_value_iface_t *viface, + void *vself, float val) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Promoting float %e into double %p", val, dest.self); + return avro_value_set_double(&dest, val); +} + +static int +try_float(memoize_state_t *state, avro_resolved_writer_t **self, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + if (is_avro_float(rschema)) { + *self = avro_resolved_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, *self); + (*self)->parent.set_float = avro_resolved_writer_set_float; + } + + else if (is_avro_double(rschema)) { + *self = avro_resolved_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, *self); + (*self)->parent.set_float = avro_resolved_writer_set_float_double; + } + + return 0; +} + + +/*----------------------------------------------------------------------- + * int + */ + +static int +avro_resolved_writer_set_int(const avro_value_iface_t *viface, + void *vself, int32_t val) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Storing %" PRId32 " into %p", val, dest.self); + return avro_value_set_int(&dest, val); +} + +static int +avro_resolved_writer_set_int_double(const avro_value_iface_t *viface, + void *vself, int32_t val) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Promoting int %" PRId32 " into double %p", val, dest.self); + return avro_value_set_double(&dest, val); +} + +static int +avro_resolved_writer_set_int_float(const avro_value_iface_t *viface, + void *vself, int32_t val) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Promoting int %" PRId32 " into float %p", val, dest.self); + return avro_value_set_float(&dest, val); +} + +static int +avro_resolved_writer_set_int_long(const avro_value_iface_t *viface, + void *vself, int32_t val) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Promoting int %" PRId32 " into long %p", val, dest.self); + return avro_value_set_long(&dest, val); +} + +static int +try_int(memoize_state_t *state, avro_resolved_writer_t **self, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + if (is_avro_int32(rschema)) { + *self = avro_resolved_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, *self); + (*self)->parent.set_int = avro_resolved_writer_set_int; + } + + else if (is_avro_int64(rschema)) { + *self = avro_resolved_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, *self); + (*self)->parent.set_int = avro_resolved_writer_set_int_long; + } + + else if (is_avro_double(rschema)) { + *self = avro_resolved_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, *self); + (*self)->parent.set_int = avro_resolved_writer_set_int_double; + } + + else if (is_avro_float(rschema)) { + *self = avro_resolved_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, *self); + (*self)->parent.set_int = avro_resolved_writer_set_int_float; + } + + return 0; +} + + +/*----------------------------------------------------------------------- + * long + */ + +static int +avro_resolved_writer_set_long(const avro_value_iface_t *viface, + void *vself, int64_t val) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Storing %" PRId64 " into %p", val, dest.self); + return avro_value_set_long(&dest, val); +} + +static int +avro_resolved_writer_set_long_double(const avro_value_iface_t *viface, + void *vself, int64_t val) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Promoting long %" PRId64 " into double %p", val, dest.self); + return avro_value_set_double(&dest, val); +} + +static int +avro_resolved_writer_set_long_float(const avro_value_iface_t *viface, + void *vself, int64_t val) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Promoting long %" PRId64 " into float %p", val, dest.self); + return avro_value_set_float(&dest, val); +} + +static int +try_long(memoize_state_t *state, avro_resolved_writer_t **self, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + if (is_avro_int64(rschema)) { + *self = avro_resolved_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, *self); + (*self)->parent.set_long = avro_resolved_writer_set_long; + } + + else if (is_avro_double(rschema)) { + *self = avro_resolved_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, *self); + (*self)->parent.set_long = avro_resolved_writer_set_long_double; + } + + else if (is_avro_float(rschema)) { + *self = avro_resolved_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, *self); + (*self)->parent.set_long = avro_resolved_writer_set_long_float; + } + + return 0; +} + + +/*----------------------------------------------------------------------- + * null + */ + +static int +avro_resolved_writer_set_null(const avro_value_iface_t *viface, + void *vself) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Storing NULL into %p", dest.self); + return avro_value_set_null(&dest); +} + +static int +try_null(memoize_state_t *state, avro_resolved_writer_t **self, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + if (is_avro_null(rschema)) { + *self = avro_resolved_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, *self); + (*self)->parent.set_null = avro_resolved_writer_set_null; + } + return 0; +} + + +/*----------------------------------------------------------------------- + * string + */ + +static int +avro_resolved_writer_set_string(const avro_value_iface_t *viface, + void *vself, char *str) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Storing \"%s\" into %p", str, dest.self); + return avro_value_set_string(&dest, str); +} + +static int +avro_resolved_writer_set_string_len(const avro_value_iface_t *viface, + void *vself, char *str, size_t size) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Storing <%p:%zu> into %p", str, size, dest.self); + return avro_value_set_string_len(&dest, str, size); +} + +static int +avro_resolved_writer_give_string_len(const avro_value_iface_t *viface, + void *vself, avro_wrapped_buffer_t *buf) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Storing [%p] into %p", buf, dest.self); + return avro_value_give_string_len(&dest, buf); +} + +static int +try_string(memoize_state_t *state, avro_resolved_writer_t **self, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + if (is_avro_string(rschema)) { + *self = avro_resolved_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, *self); + (*self)->parent.set_string = avro_resolved_writer_set_string; + (*self)->parent.set_string_len = avro_resolved_writer_set_string_len; + (*self)->parent.give_string_len = avro_resolved_writer_give_string_len; + } + return 0; +} + + +/*----------------------------------------------------------------------- + * array + */ + +typedef struct avro_resolved_array_writer { + avro_resolved_writer_t parent; + avro_resolved_writer_t *child_resolver; +} avro_resolved_array_writer_t; + +typedef struct avro_resolved_array_value { + avro_value_t wrapped; + avro_raw_array_t children; +} avro_resolved_array_value_t; + +static void +avro_resolved_array_writer_calculate_size(avro_resolved_writer_t *iface) +{ + avro_resolved_array_writer_t *aiface = + container_of(iface, avro_resolved_array_writer_t, parent); + + /* Only calculate the size for any resolver once */ + iface->calculate_size = NULL; + + DEBUG("Calculating size for %s->%s", + avro_schema_type_name((iface)->wschema), + avro_schema_type_name((iface)->rschema)); + iface->instance_size = sizeof(avro_resolved_array_value_t); + + avro_resolved_writer_calculate_size(aiface->child_resolver); +} + +static void +avro_resolved_array_writer_free_iface(avro_resolved_writer_t *iface, st_table *freeing) +{ + avro_resolved_array_writer_t *aiface = + container_of(iface, avro_resolved_array_writer_t, parent); + free_resolver(aiface->child_resolver, freeing); + avro_schema_decref(iface->wschema); + avro_schema_decref(iface->rschema); + avro_freet(avro_resolved_array_writer_t, iface); +} + +static int +avro_resolved_array_writer_init(const avro_resolved_writer_t *iface, void *vself) +{ + const avro_resolved_array_writer_t *aiface = + container_of(iface, avro_resolved_array_writer_t, parent); + avro_resolved_array_value_t *self = vself; + size_t child_instance_size = aiface->child_resolver->instance_size; + DEBUG("Initializing child array (child_size=%zu)", child_instance_size); + avro_raw_array_init(&self->children, child_instance_size); + return 0; +} + +static void +avro_resolved_array_writer_free_elements(const avro_resolved_writer_t *child_iface, + avro_resolved_array_value_t *self) +{ + size_t i; + for (i = 0; i < avro_raw_array_size(&self->children); i++) { + void *child_self = avro_raw_array_get_raw(&self->children, i); + avro_resolved_writer_done(child_iface, child_self); + } +} + +static void +avro_resolved_array_writer_done(const avro_resolved_writer_t *iface, void *vself) +{ + const avro_resolved_array_writer_t *aiface = + container_of(iface, avro_resolved_array_writer_t, parent); + avro_resolved_array_value_t *self = vself; + avro_resolved_array_writer_free_elements(aiface->child_resolver, self); + avro_raw_array_done(&self->children); +} + +static int +avro_resolved_array_writer_reset(const avro_resolved_writer_t *iface, void *vself) +{ + const avro_resolved_array_writer_t *aiface = + container_of(iface, avro_resolved_array_writer_t, parent); + avro_resolved_array_value_t *self = vself; + + /* Clear out our cache of wrapped children */ + avro_resolved_array_writer_free_elements(aiface->child_resolver, self); + return 0; +} + +static int +avro_resolved_array_writer_get_size(const avro_value_iface_t *viface, + const void *vself, size_t *size) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + const avro_resolved_array_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, &self->wrapped, &dest)); + return avro_value_get_size(&dest, size); +} + +static int +avro_resolved_array_writer_append(const avro_value_iface_t *viface, + void *vself, avro_value_t *child_out, + size_t *new_index) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + const avro_resolved_array_writer_t *aiface = + container_of(iface, avro_resolved_array_writer_t, parent); + avro_resolved_array_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, &self->wrapped, &dest)); + + child_out->iface = &aiface->child_resolver->parent; + child_out->self = avro_raw_array_append(&self->children); + if (child_out->self == NULL) { + avro_set_error("Couldn't expand array"); + return ENOMEM; + } + + DEBUG("Appending to array %p", dest.self); + return avro_value_append(&dest, child_out->self, new_index); +} + +static avro_resolved_array_writer_t * +avro_resolved_array_writer_create(avro_schema_t wschema, avro_schema_t rschema) +{ + avro_resolved_writer_t *self = avro_new(avro_resolved_array_writer_t); + memset(self, 0, sizeof(avro_resolved_array_writer_t)); + + self->parent.incref_iface = avro_resolved_writer_incref_iface; + self->parent.decref_iface = avro_resolved_writer_decref_iface; + self->parent.incref = avro_resolved_writer_incref; + self->parent.decref = avro_resolved_writer_decref; + self->parent.reset = avro_resolved_writer_reset; + self->parent.get_type = avro_resolved_writer_get_type; + self->parent.get_schema = avro_resolved_writer_get_schema; + self->parent.get_size = avro_resolved_array_writer_get_size; + self->parent.append = avro_resolved_array_writer_append; + + self->refcount = 1; + self->wschema = avro_schema_incref(wschema); + self->rschema = avro_schema_incref(rschema); + self->reader_union_branch = -1; + self->calculate_size = avro_resolved_array_writer_calculate_size; + self->free_iface = avro_resolved_array_writer_free_iface; + self->init = avro_resolved_array_writer_init; + self->done = avro_resolved_array_writer_done; + self->reset_wrappers = avro_resolved_array_writer_reset; + return container_of(self, avro_resolved_array_writer_t, parent); +} + +static int +try_array(memoize_state_t *state, avro_resolved_writer_t **self, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + /* + * First verify that the reader is an array. + */ + + if (!is_avro_array(rschema)) { + return 0; + } + + /* + * Array schemas have to have compatible element schemas to be + * compatible themselves. Try to create an resolver to check + * the compatibility. + */ + + avro_resolved_array_writer_t *aself = + avro_resolved_array_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, aself); + + avro_schema_t witems = avro_schema_array_items(wschema); + avro_schema_t ritems = avro_schema_array_items(rschema); + + avro_resolved_writer_t *item_resolver = + avro_resolved_writer_new_memoized(state, witems, ritems); + if (item_resolver == NULL) { + avro_memoize_delete(&state->mem, wschema, root_rschema); + avro_value_iface_decref(&aself->parent.parent); + avro_prefix_error("Array values aren't compatible: "); + return EINVAL; + } + + /* + * The two schemas are compatible. Store the item schema's + * resolver into the child_resolver field. + */ + + aself->child_resolver = item_resolver; + *self = &aself->parent; + return 0; +} + + +/*----------------------------------------------------------------------- + * enum + */ + +static int +avro_resolved_writer_set_enum(const avro_value_iface_t *viface, + void *vself, int val) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Storing %d into %p", val, dest.self); + return avro_value_set_enum(&dest, val); +} + +static int +try_enum(memoize_state_t *state, avro_resolved_writer_t **self, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + /* + * Enum schemas have to have the same name — but not the same + * list of symbols — to be compatible. + */ + + if (is_avro_enum(rschema)) { + const char *wname = avro_schema_name(wschema); + const char *rname = avro_schema_name(rschema); + + if (strcmp(wname, rname) == 0) { + *self = avro_resolved_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, *self); + (*self)->parent.set_enum = avro_resolved_writer_set_enum; + } + } + return 0; +} + + +/*----------------------------------------------------------------------- + * fixed + */ + +static int +avro_resolved_writer_set_fixed(const avro_value_iface_t *viface, + void *vself, void *buf, size_t size) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Storing <%p:%zu> into (fixed) %p", buf, size, dest.self); + return avro_value_set_fixed(&dest, buf, size); +} + +static int +avro_resolved_writer_give_fixed(const avro_value_iface_t *viface, + void *vself, avro_wrapped_buffer_t *buf) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + avro_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, self, &dest)); + DEBUG("Storing [%p] into (fixed) %p", buf, dest.self); + return avro_value_give_fixed(&dest, buf); +} + +static int +try_fixed(memoize_state_t *state, avro_resolved_writer_t **self, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + /* + * Fixed schemas need the same name and size to be compatible. + */ + + if (avro_schema_equal(wschema, rschema)) { + *self = avro_resolved_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, *self); + (*self)->parent.set_fixed = avro_resolved_writer_set_fixed; + (*self)->parent.give_fixed = avro_resolved_writer_give_fixed; + } + return 0; +} + + +/*----------------------------------------------------------------------- + * map + */ + +typedef struct avro_resolved_map_writer { + avro_resolved_writer_t parent; + avro_resolved_writer_t *child_resolver; +} avro_resolved_map_writer_t; + +typedef struct avro_resolved_map_value { + avro_value_t wrapped; + avro_raw_array_t children; +} avro_resolved_map_value_t; + +static void +avro_resolved_map_writer_calculate_size(avro_resolved_writer_t *iface) +{ + avro_resolved_map_writer_t *miface = + container_of(iface, avro_resolved_map_writer_t, parent); + + /* Only calculate the size for any resolver once */ + iface->calculate_size = NULL; + + DEBUG("Calculating size for %s->%s", + avro_schema_type_name((iface)->wschema), + avro_schema_type_name((iface)->rschema)); + iface->instance_size = sizeof(avro_resolved_map_value_t); + + avro_resolved_writer_calculate_size(miface->child_resolver); +} + +static void +avro_resolved_map_writer_free_iface(avro_resolved_writer_t *iface, st_table *freeing) +{ + avro_resolved_map_writer_t *miface = + container_of(iface, avro_resolved_map_writer_t, parent); + free_resolver(miface->child_resolver, freeing); + avro_schema_decref(iface->wschema); + avro_schema_decref(iface->rschema); + avro_freet(avro_resolved_map_writer_t, iface); +} + +static int +avro_resolved_map_writer_init(const avro_resolved_writer_t *iface, void *vself) +{ + const avro_resolved_map_writer_t *miface = + container_of(iface, avro_resolved_map_writer_t, parent); + avro_resolved_map_value_t *self = vself; + size_t child_instance_size = miface->child_resolver->instance_size; + DEBUG("Initializing child array for map (child_size=%zu)", child_instance_size); + avro_raw_array_init(&self->children, child_instance_size); + return 0; +} + +static void +avro_resolved_map_writer_free_elements(const avro_resolved_writer_t *child_iface, + avro_resolved_map_value_t *self) +{ + size_t i; + for (i = 0; i < avro_raw_array_size(&self->children); i++) { + void *child_self = avro_raw_array_get_raw(&self->children, i); + avro_resolved_writer_done(child_iface, child_self); + } +} + +static void +avro_resolved_map_writer_done(const avro_resolved_writer_t *iface, void *vself) +{ + const avro_resolved_map_writer_t *miface = + container_of(iface, avro_resolved_map_writer_t, parent); + avro_resolved_map_value_t *self = vself; + avro_resolved_map_writer_free_elements(miface->child_resolver, self); + avro_raw_array_done(&self->children); +} + +static int +avro_resolved_map_writer_reset(const avro_resolved_writer_t *iface, void *vself) +{ + const avro_resolved_map_writer_t *miface = + container_of(iface, avro_resolved_map_writer_t, parent); + avro_resolved_map_value_t *self = vself; + + /* Clear out our cache of wrapped children */ + avro_resolved_map_writer_free_elements(miface->child_resolver, self); + return 0; +} + +static int +avro_resolved_map_writer_get_size(const avro_value_iface_t *viface, + const void *vself, size_t *size) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + const avro_resolved_map_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, &self->wrapped, &dest)); + return avro_value_get_size(&dest, size); +} + +static int +avro_resolved_map_writer_add(const avro_value_iface_t *viface, + void *vself, const char *key, + avro_value_t *child, size_t *index, int *is_new) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + const avro_resolved_map_writer_t *miface = + container_of(iface, avro_resolved_map_writer_t, parent); + avro_resolved_map_value_t *self = vself; + avro_value_t dest; + check(rval, avro_resolved_writer_get_real_dest(iface, &self->wrapped, &dest)); + + /* + * This is a bit convoluted. We need to stash the wrapped child + * value somewhere in our children array. But we don't know + * where to put it until the wrapped map tells us whether this + * is a new value, and if not, which index the value should go + * in. + */ + + avro_value_t real_child; + size_t real_index; + int real_is_new; + + DEBUG("Adding %s to map %p", key, dest.self); + check(rval, avro_value_add(&dest, key, &real_child, &real_index, &real_is_new)); + + child->iface = &miface->child_resolver->parent; + if (real_is_new) { + child->self = avro_raw_array_append(&self->children); + DEBUG("Element is new (child resolver=%p)", child->self); + if (child->self == NULL) { + avro_set_error("Couldn't expand map"); + return ENOMEM; + } + check(rval, avro_resolved_writer_init + (miface->child_resolver, child->self)); + } else { + child->self = avro_raw_array_get_raw(&self->children, real_index); + DEBUG("Element is old (child resolver=%p)", child->self); + } + avro_value_t *child_vself = child->self; + *child_vself = real_child; + + if (index != NULL) { + *index = real_index; + } + if (is_new != NULL) { + *is_new = real_is_new; + } + return 0; +} + +static avro_resolved_map_writer_t * +avro_resolved_map_writer_create(avro_schema_t wschema, avro_schema_t rschema) +{ + avro_resolved_writer_t *self = avro_new(avro_resolved_map_writer_t); + memset(self, 0, sizeof(avro_resolved_map_writer_t)); + + self->parent.incref_iface = avro_resolved_writer_incref_iface; + self->parent.decref_iface = avro_resolved_writer_decref_iface; + self->parent.incref = avro_resolved_writer_incref; + self->parent.decref = avro_resolved_writer_decref; + self->parent.reset = avro_resolved_writer_reset; + self->parent.get_type = avro_resolved_writer_get_type; + self->parent.get_schema = avro_resolved_writer_get_schema; + self->parent.get_size = avro_resolved_map_writer_get_size; + self->parent.add = avro_resolved_map_writer_add; + + self->refcount = 1; + self->wschema = avro_schema_incref(wschema); + self->rschema = avro_schema_incref(rschema); + self->reader_union_branch = -1; + self->calculate_size = avro_resolved_map_writer_calculate_size; + self->free_iface = avro_resolved_map_writer_free_iface; + self->init = avro_resolved_map_writer_init; + self->done = avro_resolved_map_writer_done; + self->reset_wrappers = avro_resolved_map_writer_reset; + return container_of(self, avro_resolved_map_writer_t, parent); +} + +static int +try_map(memoize_state_t *state, avro_resolved_writer_t **self, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + /* + * First verify that the reader is an map. + */ + + if (!is_avro_map(rschema)) { + return 0; + } + + /* + * Map schemas have to have compatible element schemas to be + * compatible themselves. Try to create an resolver to check + * the compatibility. + */ + + avro_resolved_map_writer_t *mself = + avro_resolved_map_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, mself); + + avro_schema_t witems = avro_schema_map_values(wschema); + avro_schema_t ritems = avro_schema_map_values(rschema); + + avro_resolved_writer_t *item_resolver = + avro_resolved_writer_new_memoized(state, witems, ritems); + if (item_resolver == NULL) { + avro_memoize_delete(&state->mem, wschema, root_rschema); + avro_value_iface_decref(&mself->parent.parent); + avro_prefix_error("Map values aren't compatible: "); + return EINVAL; + } + + /* + * The two schemas are compatible. Store the item schema's + * resolver into the child_resolver field. + */ + + mself->child_resolver = item_resolver; + *self = &mself->parent; + return 0; +} + + +/*----------------------------------------------------------------------- + * record + */ + +typedef struct avro_resolved_record_writer { + avro_resolved_writer_t parent; + size_t field_count; + size_t *field_offsets; + avro_resolved_writer_t **field_resolvers; + size_t *index_mapping; +} avro_resolved_record_writer_t; + +typedef struct avro_resolved_record_value { + avro_value_t wrapped; + /* The rest of the struct is taken up by the inline storage + * needed for each field. */ +} avro_resolved_record_value_t; + +/** Return a pointer to the given field within a record struct. */ +#define avro_resolved_record_field(iface, rec, index) \ + (((void *) (rec)) + (iface)->field_offsets[(index)]) + + +static void +avro_resolved_record_writer_calculate_size(avro_resolved_writer_t *iface) +{ + avro_resolved_record_writer_t *riface = + container_of(iface, avro_resolved_record_writer_t, parent); + + /* Only calculate the size for any resolver once */ + iface->calculate_size = NULL; + + DEBUG("Calculating size for %s->%s", + avro_schema_type_name((iface)->wschema), + avro_schema_type_name((iface)->rschema)); + + /* + * Once we've figured out which writer fields we actually need, + * calculate an offset for each one. + */ + + size_t wi; + size_t next_offset = sizeof(avro_resolved_record_value_t); + for (wi = 0; wi < riface->field_count; wi++) { + riface->field_offsets[wi] = next_offset; + if (riface->field_resolvers[wi] != NULL) { + avro_resolved_writer_calculate_size + (riface->field_resolvers[wi]); + size_t field_size = + riface->field_resolvers[wi]->instance_size; + DEBUG("Field %zu has size %zu", wi, field_size); + next_offset += field_size; + } else { + DEBUG("Field %zu is being skipped", wi); + } + } + + DEBUG("Record has size %zu", next_offset); + iface->instance_size = next_offset; +} + +static void +avro_resolved_record_writer_free_iface(avro_resolved_writer_t *iface, st_table *freeing) +{ + avro_resolved_record_writer_t *riface = + container_of(iface, avro_resolved_record_writer_t, parent); + + if (riface->field_offsets != NULL) { + avro_free(riface->field_offsets, + riface->field_count * sizeof(size_t)); + } + + if (riface->field_resolvers != NULL) { + size_t i; + for (i = 0; i < riface->field_count; i++) { + if (riface->field_resolvers[i] != NULL) { + DEBUG("Freeing field %zu %p", i, + riface->field_resolvers[i]); + free_resolver(riface->field_resolvers[i], freeing); + } + } + avro_free(riface->field_resolvers, + riface->field_count * sizeof(avro_resolved_writer_t *)); + } + + if (riface->index_mapping != NULL) { + avro_free(riface->index_mapping, + riface->field_count * sizeof(size_t)); + } + + avro_schema_decref(iface->wschema); + avro_schema_decref(iface->rschema); + avro_freet(avro_resolved_record_writer_t, iface); +} + +static int +avro_resolved_record_writer_init(const avro_resolved_writer_t *iface, void *vself) +{ + int rval; + const avro_resolved_record_writer_t *riface = + container_of(iface, avro_resolved_record_writer_t, parent); + avro_resolved_record_value_t *self = vself; + + /* Initialize each field */ + size_t i; + for (i = 0; i < riface->field_count; i++) { + if (riface->field_resolvers[i] != NULL) { + check(rval, avro_resolved_writer_init + (riface->field_resolvers[i], + avro_resolved_record_field(riface, self, i))); + } + } + + return 0; +} + +static void +avro_resolved_record_writer_done(const avro_resolved_writer_t *iface, void *vself) +{ + const avro_resolved_record_writer_t *riface = + container_of(iface, avro_resolved_record_writer_t, parent); + avro_resolved_record_value_t *self = vself; + + /* Finalize each field */ + size_t i; + for (i = 0; i < riface->field_count; i++) { + if (riface->field_resolvers[i] != NULL) { + avro_resolved_writer_done + (riface->field_resolvers[i], + avro_resolved_record_field(riface, self, i)); + } + } +} + +static int +avro_resolved_record_writer_reset(const avro_resolved_writer_t *iface, void *vself) +{ + int rval; + const avro_resolved_record_writer_t *riface = + container_of(iface, avro_resolved_record_writer_t, parent); + avro_resolved_record_value_t *self = vself; + + /* Reset each field */ + size_t i; + for (i = 0; i < riface->field_count; i++) { + if (riface->field_resolvers[i] != NULL) { + check(rval, avro_resolved_writer_reset_wrappers + (riface->field_resolvers[i], + avro_resolved_record_field(riface, self, i))); + } + } + + return 0; +} + +static int +avro_resolved_record_writer_get_size(const avro_value_iface_t *viface, + const void *vself, size_t *size) +{ + AVRO_UNUSED(vself); + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + const avro_resolved_record_writer_t *riface = + container_of(iface, avro_resolved_record_writer_t, parent); + *size = riface->field_count; + return 0; +} + +static int +avro_resolved_record_writer_get_by_index(const avro_value_iface_t *viface, + const void *vself, size_t index, + avro_value_t *child, const char **name) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + const avro_resolved_record_writer_t *riface = + container_of(iface, avro_resolved_record_writer_t, parent); + const avro_resolved_record_value_t *self = vself; + avro_value_t dest; + + DEBUG("Getting writer field %zu from record %p", index, dest.self); + if (riface->field_resolvers[index] == NULL) { + DEBUG("Reader doesn't have field, skipping"); + child->iface = NULL; + child->self = NULL; + return 0; + } + + check(rval, avro_resolved_writer_get_real_dest(iface, &self->wrapped, &dest)); + size_t reader_index = riface->index_mapping[index]; + DEBUG(" Reader field is %zu", reader_index); + child->iface = &riface->field_resolvers[index]->parent; + child->self = avro_resolved_record_field(riface, self, index); + + return avro_value_get_by_index(&dest, reader_index, child->self, name); +} + +static int +avro_resolved_record_writer_get_by_name(const avro_value_iface_t *viface, + const void *vself, const char *name, + avro_value_t *child, size_t *index) +{ + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + + int wi = avro_schema_record_field_get_index(iface->wschema, name); + if (wi == -1) { + avro_set_error("Record doesn't have field named %s", name); + return EINVAL; + } + + DEBUG("Writer field %s is at index %d", name, wi); + if (index != NULL) { + *index = wi; + } + return avro_resolved_record_writer_get_by_index(viface, vself, wi, child, NULL); +} + +static avro_resolved_record_writer_t * +avro_resolved_record_writer_create(avro_schema_t wschema, avro_schema_t rschema) +{ + avro_resolved_writer_t *self = avro_new(avro_resolved_record_writer_t); + memset(self, 0, sizeof(avro_resolved_record_writer_t)); + + self->parent.incref_iface = avro_resolved_writer_incref_iface; + self->parent.decref_iface = avro_resolved_writer_decref_iface; + self->parent.incref = avro_resolved_writer_incref; + self->parent.decref = avro_resolved_writer_decref; + self->parent.reset = avro_resolved_writer_reset; + self->parent.get_type = avro_resolved_writer_get_type; + self->parent.get_schema = avro_resolved_writer_get_schema; + self->parent.get_size = avro_resolved_record_writer_get_size; + self->parent.get_by_index = avro_resolved_record_writer_get_by_index; + self->parent.get_by_name = avro_resolved_record_writer_get_by_name; + + self->refcount = 1; + self->wschema = avro_schema_incref(wschema); + self->rschema = avro_schema_incref(rschema); + self->reader_union_branch = -1; + self->calculate_size = avro_resolved_record_writer_calculate_size; + self->free_iface = avro_resolved_record_writer_free_iface; + self->init = avro_resolved_record_writer_init; + self->done = avro_resolved_record_writer_done; + self->reset_wrappers = avro_resolved_record_writer_reset; + return container_of(self, avro_resolved_record_writer_t, parent); +} + +static int +try_record(memoize_state_t *state, avro_resolved_writer_t **self, + avro_schema_t wschema, avro_schema_t rschema, + avro_schema_t root_rschema) +{ + /* + * First verify that the reader is also a record, and has the + * same name as the writer. + */ + + if (!is_avro_record(rschema)) { + return 0; + } + + const char *wname = avro_schema_name(wschema); + const char *rname = avro_schema_name(rschema); + + if (strcmp(wname, rname) != 0) { + return 0; + } + + /* + * Categorize the fields in the record schemas. Fields that are + * only in the writer are ignored. Fields that are only in the + * reader raise a schema mismatch error, unless the field has a + * default value. Fields that are in both are resolved + * recursively. + * + * The field_resolvers array will contain an avro_value_iface_t + * for each field in the writer schema. To build this array, we + * loop through the fields of the reader schema. If that field + * is also in the writer schema, we resolve them recursively, + * and store the resolver into the array. If the field isn't in + * the writer schema, we raise an error. (TODO: Eventually, + * we'll handle default values here.) After this loop finishes, + * any NULLs in the field_resolvers array will represent fields + * in the writer but not the reader; these fields will be + * skipped when processing the input. + */ + + avro_resolved_record_writer_t *rself = + avro_resolved_record_writer_create(wschema, root_rschema); + avro_memoize_set(&state->mem, wschema, root_rschema, rself); + + size_t wfields = avro_schema_record_size(wschema); + size_t rfields = avro_schema_record_size(rschema); + + DEBUG("Checking writer record schema %s", wname); + + avro_resolved_writer_t **field_resolvers = + avro_calloc(wfields, sizeof(avro_resolved_writer_t *)); + size_t *field_offsets = avro_calloc(wfields, sizeof(size_t)); + size_t *index_mapping = avro_calloc(wfields, sizeof(size_t)); + + size_t ri; + for (ri = 0; ri < rfields; ri++) { + avro_schema_t rfield = + avro_schema_record_field_get_by_index(rschema, ri); + const char *field_name = + avro_schema_record_field_name(rschema, ri); + + DEBUG("Resolving reader record field %zu (%s)", ri, field_name); + + /* + * See if this field is also in the writer schema. + */ + + int wi = avro_schema_record_field_get_index(wschema, field_name); + + if (wi == -1) { + /* + * This field isn't in the writer schema — + * that's an error! TODO: Handle default + * values! + */ + + DEBUG("Field %s isn't in writer", field_name); + avro_set_error("Reader field %s doesn't appear in writer", + field_name); + goto error; + } + + /* + * Try to recursively resolve the schemas for this + * field. If they're not compatible, that's an error. + */ + + avro_schema_t wfield = + avro_schema_record_field_get_by_index(wschema, wi); + avro_resolved_writer_t *field_resolver = + avro_resolved_writer_new_memoized(state, wfield, rfield); + + if (field_resolver == NULL) { + avro_prefix_error("Field %s isn't compatible: ", field_name); + goto error; + } + + /* + * Save the details for this field. + */ + + DEBUG("Found match for field %s (%zu in reader, %d in writer)", + field_name, ri, wi); + field_resolvers[wi] = field_resolver; + index_mapping[wi] = ri; + } + + /* + * We might not have found matches for all of the writer fields, + * but that's okay — any extras will be ignored. + */ + + rself->field_count = wfields; + rself->field_offsets = field_offsets; + rself->field_resolvers = field_resolvers; + rself->index_mapping = index_mapping; + *self = &rself->parent; + return 0; + +error: + /* + * Clean up any resolver we might have already created. + */ + + avro_memoize_delete(&state->mem, wschema, root_rschema); + avro_value_iface_decref(&rself->parent.parent); + + { + unsigned int i; + for (i = 0; i < wfields; i++) { + if (field_resolvers[i]) { + avro_value_iface_decref(&field_resolvers[i]->parent); + } + } + } + + avro_free(field_resolvers, wfields * sizeof(avro_resolved_writer_t *)); + avro_free(field_offsets, wfields * sizeof(size_t)); + avro_free(index_mapping, wfields * sizeof(size_t)); + return EINVAL; +} + + +/*----------------------------------------------------------------------- + * union + */ + +typedef struct avro_resolved_union_writer { + avro_resolved_writer_t parent; + size_t branch_count; + avro_resolved_writer_t **branch_resolvers; +} avro_resolved_union_writer_t; + +typedef struct avro_resolved_union_value { + avro_value_t wrapped; + + /** The currently active branch of the union. -1 if no branch + * is selected. */ + int discriminant; + + /* The rest of the struct is taken up by the inline storage + * needed for the active branch. */ +} avro_resolved_union_value_t; + +/** Return a pointer to the active branch within a union struct. */ +#define avro_resolved_union_branch(_union) \ + (((void *) (_union)) + sizeof(avro_resolved_union_value_t)) + + +static void +avro_resolved_union_writer_calculate_size(avro_resolved_writer_t *iface) +{ + avro_resolved_union_writer_t *uiface = + container_of(iface, avro_resolved_union_writer_t, parent); + + /* Only calculate the size for any resolver once */ + iface->calculate_size = NULL; + + DEBUG("Calculating size for %s->%s", + avro_schema_type_name((iface)->wschema), + avro_schema_type_name((iface)->rschema)); + + size_t i; + size_t max_branch_size = 0; + for (i = 0; i < uiface->branch_count; i++) { + if (uiface->branch_resolvers[i] == NULL) { + DEBUG("No match for writer union branch %zu", i); + } else { + avro_resolved_writer_calculate_size + (uiface->branch_resolvers[i]); + size_t branch_size = + uiface->branch_resolvers[i]->instance_size; + DEBUG("Writer branch %zu has size %zu", i, branch_size); + if (branch_size > max_branch_size) { + max_branch_size = branch_size; + } + } + } + + DEBUG("Maximum branch size is %zu", max_branch_size); + iface->instance_size = + sizeof(avro_resolved_union_value_t) + max_branch_size; + DEBUG("Total union size is %zu", iface->instance_size); +} + +static void +avro_resolved_union_writer_free_iface(avro_resolved_writer_t *iface, st_table *freeing) +{ + avro_resolved_union_writer_t *uiface = + container_of(iface, avro_resolved_union_writer_t, parent); + + if (uiface->branch_resolvers != NULL) { + size_t i; + for (i = 0; i < uiface->branch_count; i++) { + if (uiface->branch_resolvers[i] != NULL) { + free_resolver(uiface->branch_resolvers[i], freeing); + } + } + avro_free(uiface->branch_resolvers, + uiface->branch_count * sizeof(avro_resolved_writer_t *)); + } + + avro_schema_decref(iface->wschema); + avro_schema_decref(iface->rschema); + avro_freet(avro_resolved_union_writer_t, iface); +} + +static int +avro_resolved_union_writer_init(const avro_resolved_writer_t *iface, void *vself) +{ + AVRO_UNUSED(iface); + avro_resolved_union_value_t *self = vself; + self->discriminant = -1; + return 0; +} + +static void +avro_resolved_union_writer_done(const avro_resolved_writer_t *iface, void *vself) +{ + const avro_resolved_union_writer_t *uiface = + container_of(iface, avro_resolved_union_writer_t, parent); + avro_resolved_union_value_t *self = vself; + if (self->discriminant >= 0) { + avro_resolved_writer_done + (uiface->branch_resolvers[self->discriminant], + avro_resolved_union_branch(self)); + self->discriminant = -1; + } +} + +static int +avro_resolved_union_writer_reset(const avro_resolved_writer_t *iface, void *vself) +{ + const avro_resolved_union_writer_t *uiface = + container_of(iface, avro_resolved_union_writer_t, parent); + avro_resolved_union_value_t *self = vself; + + /* Keep the same branch selected, for the common case that we're + * about to reuse it. */ + if (self->discriminant >= 0) { + return avro_resolved_writer_reset_wrappers + (uiface->branch_resolvers[self->discriminant], + avro_resolved_union_branch(self)); + } + + return 0; +} + +static int +avro_resolved_union_writer_set_branch(const avro_value_iface_t *viface, + void *vself, int discriminant, + avro_value_t *branch) +{ + int rval; + const avro_resolved_writer_t *iface = + container_of(viface, avro_resolved_writer_t, parent); + const avro_resolved_union_writer_t *uiface = + container_of(iface, avro_resolved_union_writer_t, parent); + avro_resolved_union_value_t *self = vself; + + DEBUG("Getting writer branch %d from union %p", discriminant, vself); + avro_resolved_writer_t *branch_resolver = + uiface->branch_resolvers[discriminant]; + if (branch_resolver == NULL) { + DEBUG("Reader doesn't have branch, skipping"); + avro_set_error("Writer union branch %d is incompatible " + "with reader schema \"%s\"", + discriminant, avro_schema_type_name(iface->rschema)); + return EINVAL; + } + + if (self->discriminant == discriminant) { + DEBUG("Writer branch %d already selected", discriminant); + } else { + if (self->discriminant >= 0) { + DEBUG("Finalizing old writer branch %d", self->discriminant); + avro_resolved_writer_done + (uiface->branch_resolvers[self->discriminant], + avro_resolved_union_branch(self)); + } + DEBUG("Initializing writer branch %d", discriminant); + check(rval, avro_resolved_writer_init + (uiface->branch_resolvers[discriminant], + avro_resolved_union_branch(self))); + self->discriminant = discriminant; + } + + branch->iface = &branch_resolver->parent; + branch->self = avro_resolved_union_branch(self); + avro_value_t *branch_vself = branch->self; + *branch_vself = self->wrapped; + return 0; +} + +static avro_resolved_union_writer_t * +avro_resolved_union_writer_create(avro_schema_t wschema, avro_schema_t rschema) +{ + avro_resolved_writer_t *self = avro_new(avro_resolved_union_writer_t); + memset(self, 0, sizeof(avro_resolved_union_writer_t)); + + self->parent.incref_iface = avro_resolved_writer_incref_iface; + self->parent.decref_iface = avro_resolved_writer_decref_iface; + self->parent.incref = avro_resolved_writer_incref; + self->parent.decref = avro_resolved_writer_decref; + self->parent.reset = avro_resolved_writer_reset; + self->parent.get_type = avro_resolved_writer_get_type; + self->parent.get_schema = avro_resolved_writer_get_schema; + self->parent.set_branch = avro_resolved_union_writer_set_branch; + + self->refcount = 1; + self->wschema = avro_schema_incref(wschema); + self->rschema = avro_schema_incref(rschema); + self->reader_union_branch = -1; + self->calculate_size = avro_resolved_union_writer_calculate_size; + self->free_iface = avro_resolved_union_writer_free_iface; + self->init = avro_resolved_union_writer_init; + self->done = avro_resolved_union_writer_done; + self->reset_wrappers = avro_resolved_union_writer_reset; + return container_of(self, avro_resolved_union_writer_t, parent); +} + +static avro_resolved_writer_t * +try_union(memoize_state_t *state, + avro_schema_t wschema, avro_schema_t rschema) +{ + /* + * For a writer union, we recursively try to resolve each branch + * against the reader schema. This will work correctly whether + * or not the reader is also a union — if the reader is a union, + * then we'll resolve each (non-union) writer branch against the + * reader union, which will be checked in our calls to + * check_simple_writer below. The net result is that we might + * end up trying every combination of writer and reader + * branches, when looking for compatible schemas. + * + * Regardless of what the reader schema is, for each writer + * branch, we stash away the recursive resolver into the + * branch_resolvers array. A NULL entry in this array means + * that that branch isn't compatible with the reader. This + * isn't an immediate schema resolution error, since we allow + * incompatible branches in the types as long as that branch + * never appears in the actual data. We only return an error if + * there are *no* branches that are compatible. + */ + + size_t branch_count = avro_schema_union_size(wschema); + DEBUG("Checking %zu-branch writer union schema", branch_count); + + avro_resolved_union_writer_t *uself = + avro_resolved_union_writer_create(wschema, rschema); + avro_memoize_set(&state->mem, wschema, rschema, uself); + + avro_resolved_writer_t **branch_resolvers = + avro_calloc(branch_count, sizeof(avro_resolved_writer_t *)); + int some_branch_compatible = 0; + + size_t i; + for (i = 0; i < branch_count; i++) { + avro_schema_t branch_schema = + avro_schema_union_branch(wschema, i); + + DEBUG("Resolving writer union branch %zu (%s)", i, + avro_schema_type_name(branch_schema)); + + /* + * Try to recursively resolve this branch of the writer + * union. Don't raise an error if this fails — it's + * okay for some of the branches to not be compatible + * with the reader, as long as those branches never + * appear in the input. + */ + + branch_resolvers[i] = + avro_resolved_writer_new_memoized(state, branch_schema, rschema); + if (branch_resolvers[i] == NULL) { + DEBUG("No match for writer union branch %zu", i); + } else { + DEBUG("Found match for writer union branch %zu", i); + some_branch_compatible = 1; + } + } + + /* + * As long as there's at least one branch that's compatible with + * the reader, then we consider this schema resolution a + * success. + */ + + if (!some_branch_compatible) { + DEBUG("No writer union branches match"); + avro_set_error("No branches in the writer are compatible " + "with reader schema %s", + avro_schema_type_name(rschema)); + goto error; + } + + uself->branch_count = branch_count; + uself->branch_resolvers = branch_resolvers; + return &uself->parent; + +error: + /* + * Clean up any resolver we might have already created. + */ + + avro_memoize_delete(&state->mem, wschema, rschema); + avro_value_iface_decref(&uself->parent.parent); + + { + unsigned int i; + for (i = 0; i < branch_count; i++) { + if (branch_resolvers[i]) { + avro_value_iface_decref(&branch_resolvers[i]->parent); + } + } + } + + avro_free(branch_resolvers, branch_count * sizeof(avro_resolved_writer_t *)); + return NULL; +} + + +/*----------------------------------------------------------------------- + * Schema type dispatcher + */ + +static avro_resolved_writer_t * +avro_resolved_writer_new_memoized(memoize_state_t *state, + avro_schema_t wschema, avro_schema_t rschema) +{ + check_param(NULL, is_avro_schema(wschema), "writer schema"); + check_param(NULL, is_avro_schema(rschema), "reader schema"); + + skip_links(rschema); + + /* + * First see if we've already matched these two schemas. If so, + * just return that resolver. + */ + + avro_resolved_writer_t *saved = NULL; + if (avro_memoize_get(&state->mem, wschema, rschema, (void **) &saved)) { + DEBUG("Already resolved %s%s%s->%s", + is_avro_link(wschema)? "[": "", + avro_schema_type_name(wschema), + is_avro_link(wschema)? "]": "", + avro_schema_type_name(rschema)); + return saved; + } else { + DEBUG("Resolving %s%s%s->%s", [... 124 lines stripped ...]