qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [41/89] [abbrv] [partial] qpid-proton git commit: PROTON-1728: Reorganize the source tree
Date Tue, 03 Jul 2018 22:13:30 GMT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/dispatcher.c
----------------------------------------------------------------------
diff --git a/c/src/core/dispatcher.c b/c/src/core/dispatcher.c
new file mode 100644
index 0000000..87e4d97
--- /dev/null
+++ b/c/src/core/dispatcher.c
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "dispatcher.h"
+
+#include "framing.h"
+#include "protocol.h"
+#include "engine-internal.h"
+
+#include "dispatch_actions.h"
+
+int pni_bad_frame(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t
*args, const pn_bytes_t *payload) {
+  pn_transport_logf(transport, "Error dispatching frame: type: %d: Unknown performative",
frame_type);
+  return PN_ERR;
+}
+
+int pni_bad_frame_type(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t
*args, const pn_bytes_t *payload) {
+    pn_transport_logf(transport, "Error dispatching frame: Unknown frame type: %d", frame_type);
+    return PN_ERR;
+}
+
+// We could use a table based approach here if we needed to dynamically
+// add new performatives
+static inline int pni_dispatch_action(pn_transport_t* transport, uint64_t lcode, uint8_t
frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
+{
+  pn_action_t *action;
+  switch (frame_type) {
+  case AMQP_FRAME_TYPE:
+    /* Regular AMQP fames */
+    switch (lcode) {
+    case OPEN:            action = pn_do_open; break;
+    case BEGIN:           action = pn_do_begin; break;
+    case ATTACH:          action = pn_do_attach; break;
+    case FLOW:            action = pn_do_flow; break;
+    case TRANSFER:        action = pn_do_transfer; break;
+    case DISPOSITION:     action = pn_do_disposition; break;
+    case DETACH:          action = pn_do_detach; break;
+    case END:             action = pn_do_end; break;
+    case CLOSE:           action = pn_do_close; break;
+    default:              action = pni_bad_frame; break;
+    };
+    break;
+  case SASL_FRAME_TYPE:
+    /* SASL frames */
+    switch (lcode) {
+    case SASL_MECHANISMS: action = pn_do_mechanisms; break;
+    case SASL_INIT:       action = pn_do_init; break;
+    case SASL_CHALLENGE:  action = pn_do_challenge; break;
+    case SASL_RESPONSE:   action = pn_do_response; break;
+    case SASL_OUTCOME:    action = pn_do_outcome; break;
+    default:              action = pni_bad_frame; break;
+    };
+    break;
+  default:              action = pni_bad_frame_type; break;
+  };
+  return action(transport, frame_type, channel, args, payload);
+}
+
+static int pni_dispatch_frame(pn_transport_t * transport, pn_data_t *args, pn_frame_t frame)
+{
+  if (frame.size == 0) { // ignore null frames
+    if (transport->trace & PN_TRACE_FRM)
+      pn_transport_logf(transport, "%u <- (EMPTY FRAME)", frame.channel);
+    return 0;
+  }
+
+  ssize_t dsize = pn_data_decode(args, frame.payload, frame.size);
+  if (dsize < 0) {
+    pn_string_format(transport->scratch,
+                     "Error decoding frame: %s %s\n", pn_code(dsize),
+                     pn_error_text(pn_data_error(args)));
+    pn_quote(transport->scratch, frame.payload, frame.size);
+    pn_transport_log(transport, pn_string_get(transport->scratch));
+    return dsize;
+  }
+
+  uint8_t frame_type = frame.type;
+  uint16_t channel = frame.channel;
+  // XXX: assuming numeric -
+  // if we get a symbol we should map it to the numeric value and dispatch on that
+  uint64_t lcode;
+  bool scanned;
+  int e = pn_data_scan(args, "D?L.", &scanned, &lcode);
+  if (e) {
+    pn_transport_log(transport, "Scan error");
+    return e;
+  }
+  if (!scanned) {
+    pn_transport_log(transport, "Error dispatching frame");
+    return PN_ERR;
+  }
+  size_t payload_size = frame.size - dsize;
+  const char *payload_mem = payload_size ? frame.payload + dsize : NULL;
+  pn_bytes_t payload = {payload_size, payload_mem};
+
+  pn_do_trace(transport, channel, IN, args, payload_mem, payload_size);
+
+  int err = pni_dispatch_action(transport, lcode, frame_type, channel, args, &payload);
+
+  pn_data_clear(args);
+
+  return err;
+}
+
+ssize_t pn_dispatcher_input(pn_transport_t *transport, const char *bytes, size_t available,
bool batch, bool *halt)
+{
+  size_t read = 0;
+
+  while (available && !*halt) {
+    pn_frame_t frame;
+
+    ssize_t n = pn_read_frame(&frame, bytes + read, available, transport->local_max_frame);
+    if (n > 0) {
+      read += n;
+      available -= n;
+      transport->input_frames_ct += 1;
+      int e = pni_dispatch_frame(transport, transport->args, frame);
+      if (e) return e;
+    } else if (n < 0) {
+      pn_do_error(transport, "amqp:connection:framing-error", "malformed frame");
+      return n;
+    } else {
+      break;
+    }
+
+    if (!batch) break;
+  }
+
+  return read;
+}
+
+ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t size)
+{
+    int n = pn_buffer_get(transport->output_buffer, 0, size, bytes);
+    pn_buffer_trim(transport->output_buffer, n, 0);
+    // XXX: need to check for errors
+    return n;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/dispatcher.h
----------------------------------------------------------------------
diff --git a/c/src/core/dispatcher.h b/c/src/core/dispatcher.h
new file mode 100644
index 0000000..29881b5
--- /dev/null
+++ b/c/src/core/dispatcher.h
@@ -0,0 +1,37 @@
+#ifndef _PROTON_DISPATCHER_H
+#define _PROTON_DISPATCHER_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef __cplusplus
+#include <stdbool.h>
+#endif
+
+#include "proton/codec.h"
+#include "proton/types.h"
+
+typedef int (pn_action_t)(pn_transport_t *transport, uint8_t frame_type, uint16_t channel,
pn_data_t *args, const pn_bytes_t *payload);
+
+ssize_t pn_dispatcher_input(pn_transport_t* transport, const char* bytes, size_t available,
bool batch, bool* halt);
+ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t size);
+
+#endif /* dispatcher.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/encoder.c
----------------------------------------------------------------------
diff --git a/c/src/core/encoder.c b/c/src/core/encoder.c
new file mode 100644
index 0000000..505db47
--- /dev/null
+++ b/c/src/core/encoder.c
@@ -0,0 +1,423 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/error.h>
+#include <proton/object.h>
+#include <proton/codec.h>
+#include "encodings.h"
+#include "encoder.h"
+
+#include <string.h>
+
+#include "data.h"
+
+struct pn_encoder_t {
+  char *output;
+  char *position;
+  pn_error_t *error;
+  size_t size;
+  unsigned null_count;
+};
+
+static void pn_encoder_initialize(void *obj)
+{
+  pn_encoder_t *encoder = (pn_encoder_t *) obj;
+  encoder->output = NULL;
+  encoder->position = NULL;
+  encoder->error = pn_error();
+  encoder->size = 0;
+  encoder->null_count = 0;
+}
+
+static void pn_encoder_finalize(void *obj) {
+  pn_encoder_t *encoder = (pn_encoder_t *) obj;
+  pn_error_free(encoder->error);
+}
+
+#define pn_encoder_hashcode NULL
+#define pn_encoder_compare NULL
+#define pn_encoder_inspect NULL
+
+pn_encoder_t *pn_encoder()
+{
+  static const pn_class_t clazz = PN_CLASS(pn_encoder);
+  return (pn_encoder_t *) pn_class_new(&clazz, sizeof(pn_encoder_t));
+}
+
+static uint8_t pn_type2code(pn_encoder_t *encoder, pn_type_t type)
+{
+  switch (type)
+  {
+  case PN_NULL: return PNE_NULL;
+  case PN_BOOL: return PNE_BOOLEAN;
+  case PN_UBYTE: return PNE_UBYTE;
+  case PN_BYTE: return PNE_BYTE;
+  case PN_USHORT: return PNE_USHORT;
+  case PN_SHORT: return PNE_SHORT;
+  case PN_UINT: return PNE_UINT;
+  case PN_INT: return PNE_INT;
+  case PN_CHAR: return PNE_UTF32;
+  case PN_FLOAT: return PNE_FLOAT;
+  case PN_LONG: return PNE_LONG;
+  case PN_TIMESTAMP: return PNE_MS64;
+  case PN_DOUBLE: return PNE_DOUBLE;
+  case PN_DECIMAL32: return PNE_DECIMAL32;
+  case PN_DECIMAL64: return PNE_DECIMAL64;
+  case PN_DECIMAL128: return PNE_DECIMAL128;
+  case PN_UUID: return PNE_UUID;
+  case PN_ULONG: return PNE_ULONG;
+  case PN_BINARY: return PNE_VBIN32;
+  case PN_STRING: return PNE_STR32_UTF8;
+  case PN_SYMBOL: return PNE_SYM32;
+  case PN_LIST: return PNE_LIST32;
+  case PN_ARRAY: return PNE_ARRAY32;
+  case PN_MAP: return PNE_MAP32;
+  case PN_DESCRIBED: return PNE_DESCRIPTOR;
+  default:
+    return pn_error_format(encoder->error, PN_ERR, "not a value type: %u\n", type);
+  }
+}
+
+static uint8_t pn_node2code(pn_encoder_t *encoder, pni_node_t *node)
+{
+  switch (node->atom.type) {
+  case PN_LONG:
+    if (-128 <= node->atom.u.as_long && node->atom.u.as_long <= 127)
{
+      return PNE_SMALLLONG;
+    } else {
+      return PNE_LONG;
+    }
+  case PN_INT:
+    if (-128 <= node->atom.u.as_int && node->atom.u.as_int <= 127) {
+      return PNE_SMALLINT;
+    } else {
+      return PNE_INT;
+    }
+  case PN_ULONG:
+    if (node->atom.u.as_ulong < 256) {
+      return PNE_SMALLULONG;
+    } else {
+      return PNE_ULONG;
+    }
+  case PN_UINT:
+    if (node->atom.u.as_uint < 256) {
+      return PNE_SMALLUINT;
+    } else {
+      return PNE_UINT;
+    }
+  case PN_BOOL:
+    if (node->atom.u.as_bool) {
+      return PNE_TRUE;
+    } else {
+      return PNE_FALSE;
+    }
+  case PN_STRING:
+    if (node->atom.u.as_bytes.size < 256) {
+      return PNE_STR8_UTF8;
+    } else {
+      return PNE_STR32_UTF8;
+    }
+  case PN_SYMBOL:
+    if (node->atom.u.as_bytes.size < 256) {
+      return PNE_SYM8;
+    } else {
+      return PNE_SYM32;
+    }
+  case PN_BINARY:
+    if (node->atom.u.as_bytes.size < 256) {
+      return PNE_VBIN8;
+    } else {
+      return PNE_VBIN32;
+    }
+  default:
+    return pn_type2code(encoder, node->atom.type);
+  }
+}
+
+static size_t pn_encoder_remaining(pn_encoder_t *encoder) {
+  char * end = encoder->output + encoder->size;
+  if (end > encoder->position)
+    return end - encoder->position;
+  else
+    return 0;
+}
+
+static inline void pn_encoder_writef8(pn_encoder_t *encoder, uint8_t value)
+{
+  if (pn_encoder_remaining(encoder)) {
+    encoder->position[0] = value;
+  }
+  encoder->position++;
+}
+
+static inline void pn_encoder_writef16(pn_encoder_t *encoder, uint16_t value)
+{
+  if (pn_encoder_remaining(encoder) >= 2) {
+    encoder->position[0] = 0xFF & (value >> 8);
+    encoder->position[1] = 0xFF & (value     );
+  }
+  encoder->position += 2;
+}
+
+static inline void pn_encoder_writef32(pn_encoder_t *encoder, uint32_t value)
+{
+  if (pn_encoder_remaining(encoder) >= 4) {
+    encoder->position[0] = 0xFF & (value >> 24);
+    encoder->position[1] = 0xFF & (value >> 16);
+    encoder->position[2] = 0xFF & (value >>  8);
+    encoder->position[3] = 0xFF & (value      );
+  }
+  encoder->position += 4;
+}
+
+static inline void pn_encoder_writef64(pn_encoder_t *encoder, uint64_t value) {
+  if (pn_encoder_remaining(encoder) >= 8) {
+    encoder->position[0] = 0xFF & (value >> 56);
+    encoder->position[1] = 0xFF & (value >> 48);
+    encoder->position[2] = 0xFF & (value >> 40);
+    encoder->position[3] = 0xFF & (value >> 32);
+    encoder->position[4] = 0xFF & (value >> 24);
+    encoder->position[5] = 0xFF & (value >> 16);
+    encoder->position[6] = 0xFF & (value >>  8);
+    encoder->position[7] = 0xFF & (value      );
+  }
+  encoder->position += 8;
+}
+
+static inline void pn_encoder_writef128(pn_encoder_t *encoder, char *value) {
+  if (pn_encoder_remaining(encoder) >= 16) {
+    memmove(encoder->position, value, 16);
+  }
+  encoder->position += 16;
+}
+
+static inline void pn_encoder_writev8(pn_encoder_t *encoder, const pn_bytes_t *value)
+{
+  pn_encoder_writef8(encoder, value->size);
+  if (pn_encoder_remaining(encoder) >= value->size)
+    memmove(encoder->position, value->start, value->size);
+  encoder->position += value->size;
+}
+
+static inline void pn_encoder_writev32(pn_encoder_t *encoder, const pn_bytes_t *value)
+{
+  pn_encoder_writef32(encoder, value->size);
+  if (pn_encoder_remaining(encoder) >= value->size)
+    memmove(encoder->position, value->start, value->size);
+  encoder->position += value->size;
+}
+
+/* True if node is an element of an array - not the descriptor. */
+static bool pn_is_in_array(pn_data_t *data, pni_node_t *parent, pni_node_t *node) {
+  return (parent && parent->atom.type == PN_ARRAY) /* In array */
+    && !(parent->described && !node->prev); /* Not the descriptor */
+}
+
+/** True if node is the first element of an array, not the descriptor.
+ *@pre pn_is_in_array(data, parent, node)
+ */
+static bool pn_is_first_in_array(pn_data_t *data, pni_node_t *parent, pni_node_t *node) {
+  if (!node->prev) return !parent->described; /* First node */
+  return parent->described && (!pn_data_node(data, node->prev)->prev);
+}
+
+/** True if node is in a described list - not the descriptor.
+ *  - In this case we can omit trailing nulls
+ */
+static bool pn_is_in_described_list(pn_data_t *data, pni_node_t *parent, pni_node_t *node)
{
+  return parent && parent->atom.type == PN_LIST && parent->described;
+}
+
+typedef union {
+  uint32_t i;
+  uint32_t a[2];
+  uint64_t l;
+  float f;
+  double d;
+} conv_t;
+
+static int pni_encoder_enter(void *ctx, pn_data_t *data, pni_node_t *node)
+{
+  pn_encoder_t *encoder = (pn_encoder_t *) ctx;
+  pni_node_t *parent = pn_data_node(data, node->parent);
+  pn_atom_t *atom = &node->atom;
+  uint8_t code;
+  conv_t c;
+
+  /** In an array we don't write the code before each element, only the first. */
+  if (pn_is_in_array(data, parent, node)) {
+    code = pn_type2code(encoder, parent->type);
+    if (pn_is_first_in_array(data, parent, node)) {
+      pn_encoder_writef8(encoder, code);
+    }
+  } else {
+    code = pn_node2code(encoder, node);
+    // Omit trailing nulls for described lists
+    if (pn_is_in_described_list(data, parent, node)) {
+      if (code==PNE_NULL) {
+        encoder->null_count++;
+      } else {
+        // Output pending nulls, then the nodes code
+        for (unsigned i = 0; i<encoder->null_count; i++) {
+          pn_encoder_writef8(encoder, PNE_NULL);
+        }
+        encoder->null_count = 0;
+        pn_encoder_writef8(encoder, code);
+      }
+    } else {
+      pn_encoder_writef8(encoder, code);
+    }
+  }
+
+  switch (code) {
+  case PNE_DESCRIPTOR:
+  case PNE_NULL:
+  case PNE_TRUE:
+  case PNE_FALSE: return 0;
+  case PNE_BOOLEAN: pn_encoder_writef8(encoder, atom->u.as_bool); return 0;
+  case PNE_UBYTE: pn_encoder_writef8(encoder, atom->u.as_ubyte); return 0;
+  case PNE_BYTE: pn_encoder_writef8(encoder, atom->u.as_byte); return 0;
+  case PNE_USHORT: pn_encoder_writef16(encoder, atom->u.as_ushort); return 0;
+  case PNE_SHORT: pn_encoder_writef16(encoder, atom->u.as_short); return 0;
+  case PNE_UINT0: return 0;
+  case PNE_SMALLUINT: pn_encoder_writef8(encoder, atom->u.as_uint); return 0;
+  case PNE_UINT: pn_encoder_writef32(encoder, atom->u.as_uint); return 0;
+  case PNE_SMALLINT: pn_encoder_writef8(encoder, atom->u.as_int); return 0;
+  case PNE_INT: pn_encoder_writef32(encoder, atom->u.as_int); return 0;
+  case PNE_UTF32: pn_encoder_writef32(encoder, atom->u.as_char); return 0;
+  case PNE_ULONG: pn_encoder_writef64(encoder, atom->u.as_ulong); return 0;
+  case PNE_SMALLULONG: pn_encoder_writef8(encoder, atom->u.as_ulong); return 0;
+  case PNE_LONG: pn_encoder_writef64(encoder, atom->u.as_long); return 0;
+  case PNE_SMALLLONG: pn_encoder_writef8(encoder, atom->u.as_long); return 0;
+  case PNE_MS64: pn_encoder_writef64(encoder, atom->u.as_timestamp); return 0;
+  case PNE_FLOAT: c.f = atom->u.as_float; pn_encoder_writef32(encoder, c.i); return 0;
+  case PNE_DOUBLE: c.d = atom->u.as_double; pn_encoder_writef64(encoder, c.l); return
0;
+  case PNE_DECIMAL32: pn_encoder_writef32(encoder, atom->u.as_decimal32); return 0;
+  case PNE_DECIMAL64: pn_encoder_writef64(encoder, atom->u.as_decimal64); return 0;
+  case PNE_DECIMAL128: pn_encoder_writef128(encoder, atom->u.as_decimal128.bytes); return
0;
+  case PNE_UUID: pn_encoder_writef128(encoder, atom->u.as_uuid.bytes); return 0;
+  case PNE_VBIN8: pn_encoder_writev8(encoder, &atom->u.as_bytes); return 0;
+  case PNE_VBIN32: pn_encoder_writev32(encoder, &atom->u.as_bytes); return 0;
+  case PNE_STR8_UTF8: pn_encoder_writev8(encoder, &atom->u.as_bytes); return 0;
+  case PNE_STR32_UTF8: pn_encoder_writev32(encoder, &atom->u.as_bytes); return 0;
+  case PNE_SYM8: pn_encoder_writev8(encoder, &atom->u.as_bytes); return 0;
+  case PNE_SYM32: pn_encoder_writev32(encoder, &atom->u.as_bytes); return 0;
+  case PNE_ARRAY32:
+    node->start = encoder->position;
+    node->small = false;
+    // we'll backfill the size on exit
+    encoder->position += 4;
+    pn_encoder_writef32(encoder, node->described ? node->children - 1 : node->children);
+    if (node->described)
+      pn_encoder_writef8(encoder, 0);
+    return 0;
+  case PNE_LIST32:
+  case PNE_MAP32:
+    node->start = encoder->position;
+    node->small = false;
+    // we'll backfill the size later
+    encoder->position += 4;
+    pn_encoder_writef32(encoder, node->children);
+    return 0;
+  default:
+    return pn_error_format(data->error, PN_ERR, "unrecognized encoding: %u", code);
+  }
+}
+
+#include <stdio.h>
+
+static int pni_encoder_exit(void *ctx, pn_data_t *data, pni_node_t *node)
+{
+  pn_encoder_t *encoder = (pn_encoder_t *) ctx;
+  char *pos;
+
+  // Special case 0 length list
+  if (node->atom.type==PN_LIST && node->children-encoder->null_count==0)
{
+    encoder->position = node->start-1; // position of list opcode
+    pn_encoder_writef8(encoder, PNE_LIST0);
+    encoder->null_count = 0;
+    return 0;
+  }
+
+  switch (node->atom.type) {
+  case PN_ARRAY:
+    if ((node->described && node->children == 1) || (!node->described &&
node->children == 0)) {
+      pn_encoder_writef8(encoder, pn_type2code(encoder, node->type));
+    }
+  // Fallthrough
+  case PN_LIST:
+  case PN_MAP:
+    pos = encoder->position;
+    encoder->position = node->start;
+    if (node->small) {
+      // backfill size
+      size_t size = pos - node->start - 1;
+      pn_encoder_writef8(encoder, size);
+      // Adjust count
+      if (encoder->null_count) {
+        pn_encoder_writef8(encoder, node->children-encoder->null_count);
+      }
+    } else {
+      // backfill size
+      size_t size = pos - node->start - 4;
+      pn_encoder_writef32(encoder, size);
+      // Adjust count
+      if (encoder->null_count) {
+        pn_encoder_writef32(encoder, node->children-encoder->null_count);
+      }
+    }
+    encoder->position = pos;
+    encoder->null_count = 0;
+    return 0;
+  default:
+    return 0;
+  }
+}
+
+ssize_t pn_encoder_encode(pn_encoder_t *encoder, pn_data_t *src, char *dst, size_t size)
+{
+  encoder->output = dst;
+  encoder->position = dst;
+  encoder->size = size;
+
+  int err = pni_data_traverse(src, pni_encoder_enter, pni_encoder_exit, encoder);
+  if (err) return err;
+  size_t encoded = encoder->position - encoder->output;
+  if (encoded > size) {
+      pn_error_format(pn_data_error(src), PN_OVERFLOW, "not enough space to encode");
+      return PN_OVERFLOW;
+  }
+  return (ssize_t)encoded;
+}
+
+ssize_t pn_encoder_size(pn_encoder_t *encoder, pn_data_t *src)
+{
+  encoder->output = 0;
+  encoder->position = 0;
+  encoder->size = 0;
+
+  pn_handle_t save = pn_data_point(src);
+  int err = pni_data_traverse(src, pni_encoder_enter, pni_encoder_exit, encoder);
+  pn_data_restore(src, save);
+
+  if (err) return err;
+  return encoder->position - encoder->output;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/encoder.h
----------------------------------------------------------------------
diff --git a/c/src/core/encoder.h b/c/src/core/encoder.h
new file mode 100644
index 0000000..20876cb
--- /dev/null
+++ b/c/src/core/encoder.h
@@ -0,0 +1,31 @@
+#ifndef _PROTON_ENCODER_H
+#define _PROTON_ENCODER_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+typedef struct pn_encoder_t pn_encoder_t;
+
+pn_encoder_t *pn_encoder(void);
+ssize_t pn_encoder_encode(pn_encoder_t *encoder, pn_data_t *src, char *dst, size_t size);
+ssize_t pn_encoder_size(pn_encoder_t *encoder, pn_data_t *src);
+
+#endif /* encoder.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/engine-internal.h
----------------------------------------------------------------------
diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h
new file mode 100644
index 0000000..b1c6123
--- /dev/null
+++ b/c/src/core/engine-internal.h
@@ -0,0 +1,381 @@
+#ifndef _PROTON_ENGINE_INTERNAL_H
+#define _PROTON_ENGINE_INTERNAL_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/object.h>
+#include <proton/engine.h>
+#include <proton/types.h>
+
+#include "buffer.h"
+#include "dispatcher.h"
+#include "util.h"
+
+typedef enum pn_endpoint_type_t {CONNECTION, SESSION, SENDER, RECEIVER} pn_endpoint_type_t;
+
+typedef struct pn_endpoint_t pn_endpoint_t;
+
+struct pn_condition_t {
+  pn_string_t *name;
+  pn_string_t *description;
+  pn_data_t *info;
+};
+
+struct pn_endpoint_t {
+  pn_endpoint_type_t type;
+  pn_state_t state;
+  pn_error_t *error;
+  pn_condition_t condition;
+  pn_condition_t remote_condition;
+  pn_endpoint_t *endpoint_next;
+  pn_endpoint_t *endpoint_prev;
+  pn_endpoint_t *transport_next;
+  pn_endpoint_t *transport_prev;
+  int refcount; // when this hits zero we generate a final event
+  bool modified;
+  bool freed;
+  bool referenced;
+};
+
+typedef struct {
+  pn_sequence_t id;
+  bool sending;
+  bool sent;
+  bool init;
+} pn_delivery_state_t;
+
+typedef struct {
+  pn_sequence_t next;
+  pn_hash_t *deliveries;
+} pn_delivery_map_t;
+
+typedef struct {
+  // XXX: stop using negative numbers
+  uint32_t local_handle;
+  uint32_t remote_handle;
+  pn_sequence_t delivery_count;
+  pn_sequence_t link_credit;
+} pn_link_state_t;
+
+typedef struct {
+  // XXX: stop using negative numbers
+  uint16_t local_channel;
+  uint16_t remote_channel;
+  bool incoming_init;
+  pn_delivery_map_t incoming;
+  pn_delivery_map_t outgoing;
+  pn_sequence_t incoming_transfer_count;
+  pn_sequence_t incoming_window;
+  pn_sequence_t remote_incoming_window;
+  pn_sequence_t outgoing_transfer_count;
+  pn_sequence_t outgoing_window;
+  pn_hash_t *local_handles;
+  pn_hash_t *remote_handles;
+
+  uint64_t disp_code;
+  bool disp_settled;
+  bool disp_type;
+  pn_sequence_t disp_first;
+  pn_sequence_t disp_last;
+  bool disp;
+} pn_session_state_t;
+
+typedef struct pn_io_layer_t {
+  ssize_t (*process_input)(struct pn_transport_t *transport, unsigned int layer, const char
*, size_t);
+  ssize_t (*process_output)(struct pn_transport_t *transport, unsigned int layer, char *,
size_t);
+  void (*handle_error)(struct pn_transport_t* transport, unsigned int layer);
+  pn_timestamp_t (*process_tick)(struct pn_transport_t *transport, unsigned int layer, pn_timestamp_t);
+  size_t (*buffered_output)(struct pn_transport_t *transport);  // how much output is held
+} pn_io_layer_t;
+
+extern const pn_io_layer_t pni_passthru_layer;
+extern const pn_io_layer_t ssl_layer;
+extern const pn_io_layer_t sasl_header_layer;
+extern const pn_io_layer_t sasl_write_header_layer;
+
+// Bit flag defines for the protocol layers
+typedef uint8_t pn_io_layer_flags_t;
+#define LAYER_NONE     0
+#define LAYER_AMQP1    1
+#define LAYER_AMQPSASL 2
+#define LAYER_AMQPSSL  4
+#define LAYER_SSL      8
+
+typedef struct pni_sasl_t pni_sasl_t;
+typedef struct pni_ssl_t pni_ssl_t;
+
+struct pn_transport_t {
+  pn_tracer_t tracer;
+  pni_sasl_t *sasl;
+  pni_ssl_t *ssl;
+  pn_connection_t *connection;  // reference counted
+  char *remote_container;
+  char *remote_hostname;
+  pn_data_t *remote_offered_capabilities;
+  pn_data_t *remote_desired_capabilities;
+  pn_data_t *remote_properties;
+  pn_data_t *disp_data;
+  //#define PN_DEFAULT_MAX_FRAME_SIZE (16*1024)
+/* This is wrong and bad  we should really use a sensible starting size not unlimited */
+#define PN_DEFAULT_MAX_FRAME_SIZE (0)  /* for now, allow unlimited size */
+  uint32_t   local_max_frame;
+  uint32_t   remote_max_frame;
+  pn_condition_t remote_condition;
+  pn_condition_t condition;
+  pn_error_t *error;
+
+#define PN_IO_LAYER_CT 3
+  const pn_io_layer_t *io_layers[PN_IO_LAYER_CT];
+
+  /* dead remote detection */
+  pn_millis_t local_idle_timeout;
+  pn_millis_t remote_idle_timeout;
+  pn_timestamp_t dead_remote_deadline;
+  uint64_t last_bytes_input;
+
+  /* keepalive */
+  pn_timestamp_t keepalive_deadline;
+  uint64_t last_bytes_output;
+
+  pn_hash_t *local_channels;
+  pn_hash_t *remote_channels;
+
+
+  /* scratch area */
+  pn_string_t *scratch;
+  pn_data_t *args;
+  pn_data_t *output_args;
+  pn_buffer_t *frame;  // frame under construction
+
+  // Temporary - ??
+  pn_buffer_t *output_buffer;
+
+  /* statistics */
+  uint64_t bytes_input;
+  uint64_t bytes_output;
+  uint64_t output_frames_ct;
+  uint64_t input_frames_ct;
+
+  /* output buffered for send */
+  #define PN_TRANSPORT_INITIAL_BUFFER_SIZE (16*1024)
+  size_t output_size;
+  size_t output_pending;
+  char *output_buf;
+
+  /* input from peer */
+  size_t input_size;
+  size_t input_pending;
+  char *input_buf;
+
+  pn_record_t *context;
+
+  pn_trace_t trace;
+
+  /*
+   * The maximum channel number can be constrained in several ways:
+   *   1. an unchangeable limit imposed by this library code
+   *   2. a limit imposed by the remote peer when the connection is opened,
+   *      which this app must honor
+   *   3. a limit imposed by this app, which may be raised and lowered
+   *      until the OPEN frame is sent.
+   * These constraints are all summed up in channel_max, below.
+   */
+  #define PN_IMPL_CHANNEL_MAX  32767
+  uint16_t local_channel_max;
+  uint16_t remote_channel_max;
+  uint16_t channel_max;
+
+  pn_io_layer_flags_t allowed_layers;
+  pn_io_layer_flags_t present_layers;
+
+  bool freed;
+  bool open_sent;
+  bool open_rcvd;
+  bool close_sent;
+  bool close_rcvd;
+  bool tail_closed;      // input stream closed by driver
+  bool head_closed;
+  bool done_processing; // if true, don't call pn_process again
+  bool posted_idle_timeout;
+  bool server;
+  bool halt;
+  bool auth_required;
+  bool authenticated;
+  bool encryption_required;
+
+  bool referenced;
+};
+
+struct pn_connection_t {
+  pn_endpoint_t endpoint;
+  pn_endpoint_t *endpoint_head;
+  pn_endpoint_t *endpoint_tail;
+  pn_endpoint_t *transport_head;  // reference counted
+  pn_endpoint_t *transport_tail;
+  pn_list_t *sessions;
+  pn_list_t *freed;
+  pn_transport_t *transport;
+  pn_delivery_t *work_head;
+  pn_delivery_t *work_tail;
+  pn_delivery_t *tpwork_head;  // reference counted
+  pn_delivery_t *tpwork_tail;
+  pn_string_t *container;
+  pn_string_t *hostname;
+  pn_string_t *auth_user;
+  pn_string_t *auth_password;
+  pn_data_t *offered_capabilities;
+  pn_data_t *desired_capabilities;
+  pn_data_t *properties;
+  pn_collector_t *collector;
+  pn_record_t *context;
+  pn_list_t *delivery_pool;
+  struct pn_connection_driver_t *driver;
+};
+
+struct pn_session_t {
+  pn_endpoint_t endpoint;
+  pn_connection_t *connection;  // reference counted
+  pn_list_t *links;
+  pn_list_t *freed;
+  pn_record_t *context;
+  size_t incoming_capacity;
+  pn_sequence_t incoming_bytes;
+  pn_sequence_t outgoing_bytes;
+  pn_sequence_t incoming_deliveries;
+  pn_sequence_t outgoing_deliveries;
+  pn_sequence_t outgoing_window;
+  pn_session_state_t state;
+};
+
+struct pn_terminus_t {
+  pn_string_t *address;
+  pn_data_t *properties;
+  pn_data_t *capabilities;
+  pn_data_t *outcomes;
+  pn_data_t *filter;
+  pn_durability_t durability;
+  pn_expiry_policy_t expiry_policy;
+  pn_seconds_t timeout;
+  pn_terminus_type_t type;
+  pn_distribution_mode_t distribution_mode;
+  bool dynamic;
+};
+
+struct pn_link_t {
+  pn_endpoint_t endpoint;
+  pn_terminus_t source;
+  pn_terminus_t target;
+  pn_terminus_t remote_source;
+  pn_terminus_t remote_target;
+  pn_link_state_t state;
+  pn_string_t *name;
+  pn_session_t *session;  // reference counted
+  pn_delivery_t *unsettled_head;
+  pn_delivery_t *unsettled_tail;
+  pn_delivery_t *current;
+  pn_record_t *context;
+  size_t unsettled_count;
+  uint64_t max_message_size;
+  uint64_t remote_max_message_size;
+  pn_sequence_t available;
+  pn_sequence_t credit;
+  pn_sequence_t queued;
+  int drained; // number of drained credits
+  uint8_t snd_settle_mode;
+  uint8_t rcv_settle_mode;
+  uint8_t remote_snd_settle_mode;
+  uint8_t remote_rcv_settle_mode;
+  bool drain_flag_mode; // receiver only
+  bool drain;
+  bool detached;
+};
+
+struct pn_disposition_t {
+  pn_condition_t condition;
+  uint64_t type;
+  pn_data_t *data;
+  pn_data_t *annotations;
+  uint64_t section_offset;
+  uint32_t section_number;
+  bool failed;
+  bool undeliverable;
+  bool settled;
+};
+
+struct pn_delivery_t {
+  pn_disposition_t local;
+  pn_disposition_t remote;
+  pn_link_t *link;  // reference counted
+  pn_buffer_t *tag;
+  pn_delivery_t *unsettled_next;
+  pn_delivery_t *unsettled_prev;
+  pn_delivery_t *work_next;
+  pn_delivery_t *work_prev;
+  pn_delivery_t *tpwork_next;
+  pn_delivery_t *tpwork_prev;
+  pn_delivery_state_t state;
+  pn_buffer_t *bytes;
+  pn_record_t *context;
+  bool updated;
+  bool settled; // tracks whether we're in the unsettled list or not
+  bool work;
+  bool tpwork;
+  bool done;
+  bool referenced;
+  bool aborted;
+};
+
+#define PN_SET_LOCAL(OLD, NEW)                                          \
+  (OLD) = ((OLD) & PN_REMOTE_MASK) | (NEW)
+
+#define PN_SET_REMOTE(OLD, NEW)                                         \
+  (OLD) = ((OLD) & PN_LOCAL_MASK) | (NEW)
+
+void pn_link_dump(pn_link_t *link);
+
+void pn_dump(pn_connection_t *conn);
+void pn_transport_sasl_init(pn_transport_t *transport);
+
+void pn_condition_init(pn_condition_t *condition);
+void pn_condition_tini(pn_condition_t *condition);
+void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit);
+void pn_real_settle(pn_delivery_t *delivery);  // will free delivery if link is freed
+void pn_clear_tpwork(pn_delivery_t *delivery);
+void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery);
+void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
+void pn_connection_bound(pn_connection_t *conn);
+void pn_connection_unbound(pn_connection_t *conn);
+int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...);
+void pn_set_error_layer(pn_transport_t *transport);
+void pn_session_unbound(pn_session_t* ssn);
+void pn_link_unbound(pn_link_t* link);
+void pn_ep_incref(pn_endpoint_t *endpoint);
+void pn_ep_decref(pn_endpoint_t *endpoint);
+
+int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const char *fmt,
...);
+
+typedef enum {IN, OUT} pn_dir_t;
+
+void pn_do_trace(pn_transport_t *transport, uint16_t ch, pn_dir_t dir,
+                 pn_data_t *args, const char *payload, size_t size);
+
+#endif /* engine-internal.h */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message