qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject svn commit: r1442413 [1/2] - in /qpid/trunk/qpid/extras/nexus: ./ include/qpid/nexus/ src/ tests/
Date Mon, 04 Feb 2013 22:46:33 GMT
Author: tross
Date: Mon Feb  4 22:46:32 2013
New Revision: 1442413

URL: http://svn.apache.org/viewvc?rev=1442413&view=rev
Log:
QPID-4538
Work-in-progress - Major cleanup in the "message.h" API

Added:
    qpid/trunk/qpid/extras/nexus/include/qpid/nexus/buffer.h
      - copied, changed from r1439422, qpid/trunk/qpid/extras/nexus/include/qpid/nexus/message.h
    qpid/trunk/qpid/extras/nexus/src/buffer.c
      - copied, changed from r1439422, qpid/trunk/qpid/extras/nexus/src/message.c
    qpid/trunk/qpid/extras/nexus/src/message_private.h
      - copied, changed from r1434735, qpid/trunk/qpid/extras/nexus/src/server_private.h
Modified:
    qpid/trunk/qpid/extras/nexus/CMakeLists.txt
    qpid/trunk/qpid/extras/nexus/include/qpid/nexus/alloc.h
    qpid/trunk/qpid/extras/nexus/include/qpid/nexus/ctools.h
    qpid/trunk/qpid/extras/nexus/include/qpid/nexus/message.h
    qpid/trunk/qpid/extras/nexus/src/container.c
    qpid/trunk/qpid/extras/nexus/src/iterator.c
    qpid/trunk/qpid/extras/nexus/src/message.c
    qpid/trunk/qpid/extras/nexus/src/timer.c
    qpid/trunk/qpid/extras/nexus/tests/alloc_test.c
    qpid/trunk/qpid/extras/nexus/tests/message_test.c
    qpid/trunk/qpid/extras/nexus/tests/timer_test.c

Modified: qpid/trunk/qpid/extras/nexus/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/CMakeLists.txt?rev=1442413&r1=1442412&r2=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/CMakeLists.txt (original)
+++ qpid/trunk/qpid/extras/nexus/CMakeLists.txt Mon Feb  4 22:46:32 2013
@@ -65,6 +65,7 @@ set(CATCH_UNDEFINED "-Wl,--no-undefined"
 set(server_SOURCES
     src/alloc.c
     src/auth.c
+    src/buffer.c
     src/container.c
     src/hash.c
     src/iterator.c

Modified: qpid/trunk/qpid/extras/nexus/include/qpid/nexus/alloc.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/include/qpid/nexus/alloc.h?rev=1442413&r1=1442412&r2=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/include/qpid/nexus/alloc.h (original)
+++ qpid/trunk/qpid/extras/nexus/include/qpid/nexus/alloc.h Mon Feb  4 22:46:32 2013
@@ -57,14 +57,14 @@ void nx_dealloc(nx_alloc_type_desc_t *de
     T *new_##T();        \
     void free_##T(T *p)
 
-#define ALLOC_DEFINE_CONFIG(T,C)                                    \
-    nx_alloc_type_desc_t __desc_##T = {#T, sizeof(T), C, 0, 0, 0};  \
+#define ALLOC_DEFINE_CONFIG(T,S,C)                                  \
+    nx_alloc_type_desc_t __desc_##T = {#T, S, C, 0, 0, 0};          \
     __thread nx_alloc_pool_t *__local_pool_##T = 0;                 \
     T *new_##T() { return (T*) nx_alloc(&__desc_##T, &__local_pool_##T); }  \
     void free_##T(T *p) { nx_dealloc(&__desc_##T, &__local_pool_##T, (void*) p); } \
     nx_alloc_stats_t *alloc_stats_##T() { return __desc_##T.stats; }
 
-#define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, 0)
+#define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0)
 
 
 #endif

Copied: qpid/trunk/qpid/extras/nexus/include/qpid/nexus/buffer.h (from r1439422, qpid/trunk/qpid/extras/nexus/include/qpid/nexus/message.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/include/qpid/nexus/buffer.h?p2=qpid/trunk/qpid/extras/nexus/include/qpid/nexus/buffer.h&p1=qpid/trunk/qpid/extras/nexus/include/qpid/nexus/message.h&r1=1439422&r2=1442413&rev=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/include/qpid/nexus/message.h (original)
+++ qpid/trunk/qpid/extras/nexus/include/qpid/nexus/buffer.h Mon Feb  4 22:46:32 2013
@@ -1,5 +1,5 @@
-#ifndef __nexus_message_h__
-#define __nexus_message_h__ 1
+#ifndef __nexus_buffer_h__
+#define __nexus_buffer_h__ 1
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,144 +19,57 @@
  * under the License.
  */
 
-#include <proton/engine.h>
 #include <qpid/nexus/ctools.h>
-#include <qpid/nexus/iterator.h>
 
-typedef struct nx_message_t nx_message_t;
-typedef struct nx_buffer_t  nx_buffer_t;
+typedef struct nx_buffer_t nx_buffer_t;
 
 DEQ_DECLARE(nx_buffer_t, nx_buffer_list_t);
-DEQ_DECLARE(nx_message_t, nx_message_list_t);
-
-typedef struct {
-    nx_buffer_t *buffer;  // Buffer that contains the first octet of the field, null if the field is not present
-    size_t       offset;  // Offset in the buffer to the first octet
-    size_t       length;  // Length of the field or zero if unneeded
-    int          parsed;  // non-zero iff the buffer chain has been parsed to find this field
-} nx_field_location_t;
-
-
-// TODO - consider using pointers to nx_field_location_t below to save memory
-struct nx_message_t {
-    DEQ_LINKS(nx_message_t);
-    nx_buffer_list_t     buffers;                         // The buffer chain containing the message
-    pn_delivery_t       *in_delivery;                     // The delivery on which the message arrived
-    pn_delivery_t       *out_delivery;                    // The delivery on which the message was last sent
-    nx_field_location_t  section_message_header;          // The message header list
-    nx_field_location_t  section_delivery_annotation;     // The delivery annotation map
-    nx_field_location_t  section_message_annotation;      // The message annotation map
-    nx_field_location_t  section_message_properties;      // The message properties list
-    nx_field_location_t  section_application_properties;  // The application properties list
-    nx_field_location_t  section_body;                    // The message body: Data
-    nx_field_location_t  section_footer;                  // The footer
-    nx_field_location_t  field_user_id;                   // The string value of the user-id
-    nx_field_location_t  field_to;                        // The string value of the to field
-    nx_field_location_t  body;                            // The body of the message
-    nx_field_location_t  compose_length;
-    nx_field_location_t  compose_count;
-    uint32_t             length;
-    uint32_t             count;
-};
 
 struct nx_buffer_t {
     DEQ_LINKS(nx_buffer_t);
     unsigned int size;
 };
 
-typedef struct {
-    size_t        buffer_size;
-    unsigned long buffer_preallocation_count;
-    unsigned long buffer_rebalancing_batch_count;
-    unsigned long buffer_local_storage_max;
-    unsigned long buffer_free_list_max;
-    unsigned long message_allocation_batch_count;
-    unsigned long message_rebalancing_batch_count;
-    unsigned long message_local_storage_max;
-} nx_allocator_config_t;
-
-const nx_allocator_config_t *nx_allocator_default_config(void);
-
-void nx_allocator_initialize(const nx_allocator_config_t *config);
-void nx_allocator_finalize(void);
-
-//
-// Functions for per-thread allocators.
-//
-nx_message_t *nx_allocate_message(void);
-nx_buffer_t  *nx_allocate_buffer(void);
-void          nx_free_message(nx_message_t *msg);
-void          nx_free_buffer(nx_buffer_t *buf);
-
-
-typedef enum {
-    NX_DEPTH_NONE,
-    NX_DEPTH_HEADER,
-    NX_DEPTH_DELIVERY_ANNOTATIONS,
-    NX_DEPTH_MESSAGE_ANNOTATIONS,
-    NX_DEPTH_MESSAGE_PROPERTIES,     // Needed for 'user-id' and 'to'
-    NX_DEPTH_APPLICATION_PROPERTIES,
-    NX_DEPTH_BODY,
-    NX_DEPTH_ALL
-} nx_message_depth_t;
-
-//
-// Functions for received messages
-//
-nx_message_t *nx_message_receive(pn_delivery_t *delivery);
-int nx_message_check(nx_message_t *msg, nx_message_depth_t depth);
-nx_field_iterator_t *nx_message_field_to(nx_message_t *msg);
-nx_field_iterator_t *nx_message_body(nx_message_t *msg);
-
-//
-// Functions for composed messages
-//
-
-// Convenience Functions
-void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_t *buf_chain);
-
-// Raw Functions
-void nx_message_begin_header(nx_message_t *msg);
-void nx_message_end_header(nx_message_t *msg);
-
-void nx_message_begin_delivery_annotations(nx_message_t *msg);
-void nx_message_end_delivery_annotations(nx_message_t *msg);
-
-void nx_message_begin_message_annotations(nx_message_t *msg);
-void nx_message_end_message_annotations(nx_message_t *msg);
-
-void nx_message_begin_message_properties(nx_message_t *msg);
-void nx_message_end_message_properties(nx_message_t *msg);
-
-void nx_message_begin_application_properties(nx_message_t *msg);
-void nx_message_end_application_properties(nx_message_t *msg);
-
-void nx_message_append_body_data(nx_message_t *msg, nx_buffer_t *buf_chain);
-
-void nx_message_begin_body_sequence(nx_message_t *msg);
-void nx_message_end_body_sequence(nx_message_t *msg);
-
-void nx_message_begin_footer(nx_message_t *msg);
-void nx_message_end_footer(nx_message_t *msg);
-
-void nx_message_insert_null(nx_message_t *msg);
-void nx_message_insert_boolean(nx_message_t *msg, int value);
-void nx_message_insert_ubyte(nx_message_t *msg, uint8_t value);
-void nx_message_insert_uint(nx_message_t *msg, uint32_t value);
-void nx_message_insert_ulong(nx_message_t *msg, uint64_t value);
-void nx_message_insert_binary(nx_message_t *msg, const uint8_t *start, size_t len);
-void nx_message_insert_string(nx_message_t *msg, const char *start);
-void nx_message_insert_uuid(nx_message_t *msg, const uint8_t *value);
-void nx_message_insert_symbol(nx_message_t *msg, const char *start, size_t len);
-void nx_message_insert_timestamp(nx_message_t *msg, uint64_t value);
-
-//
-// Functions for buffers
-//
-unsigned char *nx_buffer_base(nx_buffer_t *buf);      // Pointer to the first octet in the buffer
-unsigned char *nx_buffer_cursor(nx_buffer_t *buf);    // Pointer to the first free octet in the buffer
-size_t         nx_buffer_capacity(nx_buffer_t *buf);  // Size of free space in the buffer in octets
-size_t         nx_buffer_size(nx_buffer_t *buf);      // Number of octets in the buffer
-void           nx_buffer_insert(nx_buffer_t *buf, size_t len);  // Notify the buffer that 'len' octets were written at cursor
+/**
+ */
+nx_buffer_t *nx_allocate_buffer(void);
+
+/**
+ * @param buf A pointer to an allocated buffer
+ */
+void nx_free_buffer(nx_buffer_t *buf);
+
+/**
+ * @param buf A pointer to an allocated buffer
+ * @return A pointer to the first octet in the buffer
+ */
+unsigned char *nx_buffer_base(nx_buffer_t *buf);
+
+/**
+ * @param buf A pointer to an allocated buffer
+ * @return A pointer to the first free octet in the buffer, the insert point for new data.
+ */
+unsigned char *nx_buffer_cursor(nx_buffer_t *buf);
+
+/**
+ * @param buf A pointer to an allocated buffer
+ * @return The number of octets in the buffer's free space, how many octets may be inserted.
+ */
+size_t nx_buffer_capacity(nx_buffer_t *buf);
+
+/**
+ * @param buf A pointer to an allocated buffer
+ * @return The number of octets of data in the buffer
+ */
+size_t nx_buffer_size(nx_buffer_t *buf);
+
+/**
+ * Notify the buffer that octets have been inserted at the buffer's cursor.  This will advance the
+ * cursor by len octets.
+ *
+ * @param buf A pointer to an allocated buffer
+ * @param len The number of octets that have been appended to the buffer
+ */
+void nx_buffer_insert(nx_buffer_t *buf, size_t len);
 
 #endif

Modified: qpid/trunk/qpid/extras/nexus/include/qpid/nexus/ctools.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/include/qpid/nexus/ctools.h?rev=1442413&r1=1442412&r2=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/include/qpid/nexus/ctools.h (original)
+++ qpid/trunk/qpid/extras/nexus/include/qpid/nexus/ctools.h Mon Feb  4 22:46:32 2013
@@ -37,11 +37,11 @@
 
 #define DEQ_LINKS(t) t *prev; t *next
 
-#define DEQ_INIT(d) do { d.head = 0; d.tail = 0; d.scratch = 0; d.size = 0; } while (0)
+#define DEQ_INIT(d) do { (d).head = 0; (d).tail = 0; (d).scratch = 0; (d).size = 0; } while (0)
 #define DEQ_ITEM_INIT(i) do { (i)->next = 0; (i)->prev = 0; } while(0)
-#define DEQ_HEAD(d) (d.head)
-#define DEQ_TAIL(d) (d.tail)
-#define DEQ_SIZE(d) (d.size)
+#define DEQ_HEAD(d) ((d).head)
+#define DEQ_TAIL(d) ((d).tail)
+#define DEQ_SIZE(d) ((d).size)
 #define DEQ_NEXT(i) (i)->next
 #define DEQ_PREV(i) (i)->prev
 
@@ -49,67 +49,67 @@
 do {                              \
     CT_ASSERT((i)->next == 0);    \
     CT_ASSERT((i)->prev == 0);    \
-    if (d.head) {                 \
-        (i)->next = d.head;       \
-        d.head->prev = i;         \
+    if ((d).head) {               \
+        (i)->next = (d).head;     \
+        (d).head->prev = i;       \
     } else {                      \
-        d.tail = i;               \
+        (d).tail = i;             \
         (i)->next = 0;            \
-        CT_ASSERT(d.size == 0);   \
+        CT_ASSERT((d).size == 0); \
     }                             \
     (i)->prev = 0;                \
-    d.head = i;                   \
-    d.size++;                     \
+    (d).head = i;                 \
+    (d).size++;                   \
 } while (0)
 
 #define DEQ_INSERT_TAIL(d,i)      \
 do {                              \
     CT_ASSERT((i)->next == 0);    \
     CT_ASSERT((i)->prev == 0);    \
-    if (d.tail) {                 \
-        (i)->prev = d.tail;       \
-        d.tail->next = i;         \
+    if ((d).tail) {               \
+        (i)->prev = (d).tail;     \
+        (d).tail->next = i;       \
     } else {                      \
-        d.head = i;               \
+        (d).head = i;             \
         (i)->prev = 0;            \
-        CT_ASSERT(d.size == 0);   \
+        CT_ASSERT((d).size == 0); \
     }                             \
     (i)->next = 0;                \
-    d.tail = i;                   \
-    d.size++;                     \
+    (d).tail = i;                 \
+    (d).size++;                   \
 } while (0)
 
-#define DEQ_REMOVE_HEAD(d)      \
-do {                            \
-    CT_ASSERT(d.head);          \
-    if (d.head) {               \
-        d.scratch = d.head;     \
-        d.head = d.head->next;  \
-        if (d.head == 0) {      \
-            d.tail = 0;         \
-            CT_ASSERT(d.size == 1); \
+#define DEQ_REMOVE_HEAD(d)        \
+do {                              \
+    CT_ASSERT((d).head);          \
+    if ((d).head) {               \
+        (d).scratch = (d).head;   \
+        (d).head = (d).head->next;  \
+        if ((d).head == 0) {      \
+            (d).tail = 0;         \
+            CT_ASSERT((d).size == 1); \
         } else                  \
-            d.head->prev = 0;   \
-        d.size--;               \
-        d.scratch->next = 0;    \
-        d.scratch->prev = 0;    \
+            (d).head->prev = 0; \
+        (d).size--;             \
+        (d).scratch->next = 0;  \
+        (d).scratch->prev = 0;  \
     }                           \
 } while (0)
 
 #define DEQ_REMOVE_TAIL(d)      \
 do {                            \
-    CT_ASSERT(d.tail);          \
-    if (d.tail) {               \
-        d.scratch = d.tail;     \
-        d.tail = d.tail->prev;  \
-        if (d.tail == 0) {      \
-            d.head = 0;         \
-            CT_ASSERT(d.size == 1); \
+    CT_ASSERT((d).tail);        \
+    if ((d).tail) {             \
+        (d).scratch = (d).tail; \
+        (d).tail = (d).tail->prev;  \
+        if ((d).tail == 0) {    \
+            (d).head = 0;       \
+            CT_ASSERT((d).size == 1); \
         } else                  \
-            d.tail->next = 0;   \
-        d.size--;               \
-        d.scratch->next = 0;    \
-        d.scratch->prev = 0;    \
+            (d).tail->next = 0; \
+        (d).size--;             \
+        (d).scratch->next = 0;  \
+        (d).scratch->prev = 0;  \
     }                           \
 } while (0)
 
@@ -120,11 +120,11 @@ do {                            \
     if ((a)->next)              \
         (a)->next->prev = (i);  \
     else                        \
-        d.tail = (i);           \
+        (d).tail = (i);         \
     (i)->next = (a)->next;      \
     (i)->prev = (a);            \
     (a)->next = (i);            \
-    d.size++;                   \
+    (d).size++;                 \
 } while (0)
 
 #define DEQ_REMOVE(d,i)                        \
@@ -132,15 +132,15 @@ do {                                    
     if ((i)->next)                             \
         (i)->next->prev = (i)->prev;           \
     else                                       \
-        d.tail = (i)->prev;                    \
+        (d).tail = (i)->prev;                  \
     if ((i)->prev)                             \
         (i)->prev->next = (i)->next;           \
     else                                       \
-        d.head = (i)->next;                    \
-    d.size--;                                  \
+        (d).head = (i)->next;                  \
+    (d).size--;                                \
     (i)->next = 0;                             \
     (i)->prev = 0;                             \
-    CT_ASSERT(d.size || (!d.head && !d.tail)); \
+    CT_ASSERT((d).size || (!(d).head && !(d).tail)); \
 } while (0)
 
 #endif

Modified: qpid/trunk/qpid/extras/nexus/include/qpid/nexus/message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/include/qpid/nexus/message.h?rev=1442413&r1=1442412&r2=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/include/qpid/nexus/message.h (original)
+++ qpid/trunk/qpid/extras/nexus/include/qpid/nexus/message.h Mon Feb  4 22:46:32 2013
@@ -21,99 +21,105 @@
 
 #include <proton/engine.h>
 #include <qpid/nexus/ctools.h>
+#include <qpid/nexus/alloc.h>
 #include <qpid/nexus/iterator.h>
+#include <qpid/nexus/buffer.h>
+
+// Callback for status change (confirmed persistent, loaded-in-memory, etc.)
 
 typedef struct nx_message_t nx_message_t;
-typedef struct nx_buffer_t  nx_buffer_t;
 
-DEQ_DECLARE(nx_buffer_t, nx_buffer_list_t);
 DEQ_DECLARE(nx_message_t, nx_message_list_t);
 
-typedef struct {
-    nx_buffer_t *buffer;  // Buffer that contains the first octet of the field, null if the field is not present
-    size_t       offset;  // Offset in the buffer to the first octet
-    size_t       length;  // Length of the field or zero if unneeded
-    int          parsed;  // non-zero iff the buffer chain has been parsed to find this field
-} nx_field_location_t;
-
-
-// TODO - consider using pointers to nx_field_location_t below to save memory
 struct nx_message_t {
     DEQ_LINKS(nx_message_t);
-    nx_buffer_list_t     buffers;                         // The buffer chain containing the message
-    pn_delivery_t       *in_delivery;                     // The delivery on which the message arrived
-    pn_delivery_t       *out_delivery;                    // The delivery on which the message was last sent
-    nx_field_location_t  section_message_header;          // The message header list
-    nx_field_location_t  section_delivery_annotation;     // The delivery annotation map
-    nx_field_location_t  section_message_annotation;      // The message annotation map
-    nx_field_location_t  section_message_properties;      // The message properties list
-    nx_field_location_t  section_application_properties;  // The application properties list
-    nx_field_location_t  section_body;                    // The message body: Data
-    nx_field_location_t  section_footer;                  // The footer
-    nx_field_location_t  field_user_id;                   // The string value of the user-id
-    nx_field_location_t  field_to;                        // The string value of the to field
-    nx_field_location_t  body;                            // The body of the message
-    nx_field_location_t  compose_length;
-    nx_field_location_t  compose_count;
-    uint32_t             length;
-    uint32_t             count;
+    // Private members not listed here.
 };
 
-struct nx_buffer_t {
-    DEQ_LINKS(nx_buffer_t);
-    unsigned int size;
-};
-
-typedef struct {
-    size_t        buffer_size;
-    unsigned long buffer_preallocation_count;
-    unsigned long buffer_rebalancing_batch_count;
-    unsigned long buffer_local_storage_max;
-    unsigned long buffer_free_list_max;
-    unsigned long message_allocation_batch_count;
-    unsigned long message_rebalancing_batch_count;
-    unsigned long message_local_storage_max;
-} nx_allocator_config_t;
-
-const nx_allocator_config_t *nx_allocator_default_config(void);
-
-void nx_allocator_initialize(const nx_allocator_config_t *config);
-void nx_allocator_finalize(void);
-
-//
-// Functions for per-thread allocators.
-//
-nx_message_t *nx_allocate_message(void);
-nx_buffer_t  *nx_allocate_buffer(void);
-void          nx_free_message(nx_message_t *msg);
-void          nx_free_buffer(nx_buffer_t *buf);
-
-
 typedef enum {
     NX_DEPTH_NONE,
     NX_DEPTH_HEADER,
     NX_DEPTH_DELIVERY_ANNOTATIONS,
     NX_DEPTH_MESSAGE_ANNOTATIONS,
-    NX_DEPTH_MESSAGE_PROPERTIES,     // Needed for 'user-id' and 'to'
+    NX_DEPTH_PROPERTIES,
     NX_DEPTH_APPLICATION_PROPERTIES,
     NX_DEPTH_BODY,
     NX_DEPTH_ALL
 } nx_message_depth_t;
 
+
+typedef enum {
+    //
+    // Message Sections
+    //
+    NX_FIELD_HEADER,
+    NX_FIELD_DELIVERY_ANNOTATION,
+    NX_FIELD_MESSAGE_ANNOTATION,
+    NX_FIELD_PROPERTIES,
+    NX_FIELD_APPLICATION_PROPERTIES,
+    NX_FIELD_BODY,
+    NX_FIELD_FOOTER,
+
+    //
+    // Fields of the Header Section
+    //
+    NX_FIELD_DURABLE,
+    NX_FIELD_PRIORITY,
+    NX_FIELD_TTL,
+    NX_FIELD_FIRST_ACQUIRER,
+    NX_FIELD_DELIVERY_COUNT,
+
+    //
+    // Fields of the Properties Section
+    //
+    NX_FIELD_MESSAGE_ID,
+    NX_FIELD_USER_ID,
+    NX_FIELD_TO,
+    NX_FIELD_SUBJECT,
+    NX_FIELD_REPLY_TO,
+    NX_FIELD_CORRELATION_ID,
+    NX_FIELD_CONTENT_TYPE,
+    NX_FIELD_CONTENT_ENCODING,
+    NX_FIELD_ABSOLUTE_EXPIRY_TIME,
+    NX_FIELD_CREATION_TIME,
+    NX_FIELD_GROUP_ID,
+    NX_FIELD_GROUP_SEQUENCE,
+    NX_FIELD_REPLY_TO_GROUP_ID
+} nx_message_field_t;
+
+//
+// Functions for allocation
+//
+nx_message_t *nx_allocate_message(void);
+void nx_free_message(nx_message_t *qm);
+nx_message_t *nx_message_copy(nx_message_t *qm);
+int nx_message_persistent(nx_message_t *qm);
+int nx_message_in_memory(nx_message_t *qm);
+
+void nx_message_set_out_delivery(nx_message_t *msg, pn_delivery_t *delivery);
+pn_delivery_t *nx_message_out_delivery(nx_message_t *msg);
+void nx_message_set_in_delivery(nx_message_t *msg, pn_delivery_t *delivery);
+pn_delivery_t *nx_message_in_delivery(nx_message_t *msg);
+
 //
 // Functions for received messages
 //
 nx_message_t *nx_message_receive(pn_delivery_t *delivery);
+void nx_message_send(nx_message_t *msg, pn_link_t *link);
+
 int nx_message_check(nx_message_t *msg, nx_message_depth_t depth);
-nx_field_iterator_t *nx_message_field_to(nx_message_t *msg);
-nx_field_iterator_t *nx_message_body(nx_message_t *msg);
+nx_field_iterator_t *nx_message_field(nx_message_t *msg, nx_message_field_t field);
+
+pn_delivery_t *nx_message_inbound_delivery(nx_message_t *qm);
 
 //
 // Functions for composed messages
 //
 
 // Convenience Functions
-void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_t *buf_chain);
+void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_list_t *buffers);
+void nx_message_copy_header(nx_message_t *msg); // Copy received header into send-header (prior to adding annotations)
+void nx_message_copy_message_annotations(nx_message_t *msg);
 
 // Raw Functions
 void nx_message_begin_header(nx_message_t *msg);
@@ -131,7 +137,7 @@ void nx_message_end_message_properties(n
 void nx_message_begin_application_properties(nx_message_t *msg);
 void nx_message_end_application_properties(nx_message_t *msg);
 
-void nx_message_append_body_data(nx_message_t *msg, nx_buffer_t *buf_chain);
+void nx_message_append_body_data(nx_message_t *msg, nx_buffer_list_t *buffers);
 
 void nx_message_begin_body_sequence(nx_message_t *msg);
 void nx_message_end_body_sequence(nx_message_t *msg);
@@ -149,14 +155,9 @@ void nx_message_insert_string(nx_message
 void nx_message_insert_uuid(nx_message_t *msg, const uint8_t *value);
 void nx_message_insert_symbol(nx_message_t *msg, const char *start, size_t len);
 void nx_message_insert_timestamp(nx_message_t *msg, uint64_t value);
-
-//
-// Functions for buffers
-//
-unsigned char *nx_buffer_base(nx_buffer_t *buf);      // Pointer to the first octet in the buffer
-unsigned char *nx_buffer_cursor(nx_buffer_t *buf);    // Pointer to the first free octet in the buffer
-size_t         nx_buffer_capacity(nx_buffer_t *buf);  // Size of free space in the buffer in octets
-size_t         nx_buffer_size(nx_buffer_t *buf);      // Number of octets in the buffer
-void           nx_buffer_insert(nx_buffer_t *buf, size_t len);  // Notify the buffer that 'len' octets were written at cursor
+void nx_message_begin_list(nx_message_t* msg);
+void nx_message_end_list(nx_message_t* msg);
+void nx_message_begin_map(nx_message_t* msg);
+void nx_message_end_map(nx_message_t* msg);
 
 #endif

Copied: qpid/trunk/qpid/extras/nexus/src/buffer.c (from r1439422, qpid/trunk/qpid/extras/nexus/src/message.c)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/src/buffer.c?p2=qpid/trunk/qpid/extras/nexus/src/buffer.c&p1=qpid/trunk/qpid/extras/nexus/src/message.c&r1=1439422&r2=1442413&rev=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/src/message.c (original)
+++ qpid/trunk/qpid/extras/nexus/src/buffer.c Mon Feb  4 22:46:32 2013
@@ -17,1118 +17,30 @@
  * under the License.
  */
 
-#include <qpid/nexus/message.h>
-#include <qpid/nexus/ctools.h>
-#include <qpid/nexus/threading.h>
-#include <string.h>
-#include <stdio.h>
+#include <qpid/nexus/buffer.h>
+#include <qpid/nexus/alloc.h>
 
+#ifndef NEXUS_BUFFER_SIZE
+#define NEXUS_BUFFER_SIZE 512
+#endif
 
-//
-// Per-Thread allocator
-//
-typedef struct nx_allocator_t {
-    nx_message_list_t  message_free_list;
-    nx_buffer_list_t   buffer_free_list;
-} nx_allocator_t;
-
-//
-// Global allocator (protected by a global lock)
-//
-typedef struct {
-    nx_message_list_t  message_free_list;
-    nx_buffer_list_t   buffer_free_list;
-    sys_mutex_t       *lock;
-} nx_global_allocator_t;
-
-static nx_global_allocator_t  global;
-static nx_allocator_config_t  default_config;
-static const nx_allocator_config_t *config;
-
-
-static nx_allocator_t *nx_get_allocator(void)
-{
-    static __thread nx_allocator_t *alloc = 0;
-
-    if (!alloc) {
-        alloc = NEW(nx_allocator_t);
-
-        if (!alloc)
-            return 0;
-
-        DEQ_INIT(alloc->message_free_list);
-        DEQ_INIT(alloc->buffer_free_list);
-    }
-
-    return alloc;
-}
-
-
-static void advance(unsigned char **cursor, nx_buffer_t **buffer, int consume)
-{
-    unsigned char *local_cursor = *cursor;
-    nx_buffer_t   *local_buffer = *buffer;
-
-    int remaining = nx_buffer_size(local_buffer) - (local_cursor - nx_buffer_base(local_buffer));
-    while (consume > 0) {
-        if (consume < remaining) {
-            local_cursor += consume;
-            consume = 0;
-        } else {
-            consume -= remaining;
-            local_buffer = local_buffer->next;
-            if (local_buffer == 0){
-                local_cursor = 0;
-                break;
-            }
-            local_cursor = nx_buffer_base(local_buffer);
-            remaining = nx_buffer_size(local_buffer) - (local_cursor - nx_buffer_base(local_buffer));
-        }
-    }
-
-    *cursor = local_cursor;
-    *buffer = local_buffer;
-}
-
-
-static unsigned char next_octet(unsigned char **cursor, nx_buffer_t **buffer)
-{
-    unsigned char result = **cursor;
-    advance(cursor, buffer, 1);
-    return result;
-}
-
-
-static int traverse_field(unsigned char **cursor, nx_buffer_t **buffer, nx_field_location_t *field)
-{
-    unsigned char tag = next_octet(cursor, buffer);
-    if (!(*cursor)) return 0;
-    int consume = 0;
-    switch (tag & 0xF0) {
-    case 0x40 : consume = 0;  break;
-    case 0x50 : consume = 1;  break;
-    case 0x60 : consume = 2;  break;
-    case 0x70 : consume = 4;  break;
-    case 0x80 : consume = 8;  break;
-    case 0x90 : consume = 16; break;
-
-    case 0xB0 :
-    case 0xD0 :
-    case 0xF0 :
-        consume |= ((int) next_octet(cursor, buffer)) << 24;
-        if (!(*cursor)) return 0;
-        consume |= ((int) next_octet(cursor, buffer)) << 16;
-        if (!(*cursor)) return 0;
-        consume |= ((int) next_octet(cursor, buffer)) << 8;
-        if (!(*cursor)) return 0;
-        // Fall through to the next case...
-
-    case 0xA0 :
-    case 0xC0 :
-    case 0xE0 :
-        consume |= (int) next_octet(cursor, buffer);
-        if (!(*cursor)) return 0;
-        break;
-    }
-
-    if (field) {
-        field->buffer = *buffer;
-        field->offset = *cursor - nx_buffer_base(*buffer);
-        field->length = consume;
-        field->parsed = 1;
-    }
-
-    advance(cursor, buffer, consume);
-    return 1;
-}
-
-
-static int start_list(unsigned char **cursor, nx_buffer_t **buffer)
-{
-    unsigned char tag = next_octet(cursor, buffer);
-    if (!(*cursor)) return 0;
-    int length = 0;
-    int count  = 0;
-
-    switch (tag) {
-    case 0x45 :     // list0
-        break;
-    case 0xd0 :     // list32
-        length |= ((int) next_octet(cursor, buffer)) << 24;
-        if (!(*cursor)) return 0;
-        length |= ((int) next_octet(cursor, buffer)) << 16;
-        if (!(*cursor)) return 0;
-        length |= ((int) next_octet(cursor, buffer)) << 8;
-        if (!(*cursor)) return 0;
-        length |=  (int) next_octet(cursor, buffer);
-        if (!(*cursor)) return 0;
-
-        count |= ((int) next_octet(cursor, buffer)) << 24;
-        if (!(*cursor)) return 0;
-        count |= ((int) next_octet(cursor, buffer)) << 16;
-        if (!(*cursor)) return 0;
-        count |= ((int) next_octet(cursor, buffer)) << 8;
-        if (!(*cursor)) return 0;
-        count |=  (int) next_octet(cursor, buffer);
-        if (!(*cursor)) return 0;
-
-        break;
-
-    case 0xc0 :     // list8
-        length |= (int) next_octet(cursor, buffer);
-        if (!(*cursor)) return 0;
-
-        count |= (int) next_octet(cursor, buffer);
-        if (!(*cursor)) return 0;
-        break;
-    }
-
-    return count;
-}
-
-
-//
-// Check the buffer chain, starting at cursor to see if it matches the pattern.
-// If the pattern matches, check the next tag to see if it's in the set of expected
-// tags.  If not, return zero.  If so, set the location descriptor to the good
-// tag and advance the cursor (and buffer, if needed) to the end of the matched section.
-//
-// If there is no match, don't advance the cursor.
-//
-// Return 0 if the pattern matches but the following tag is unexpected
-// Return 0 if the pattern matches and the location already has a pointer (duplicate section)
-// Return 1 if the pattern matches and we've advanced the cursor/buffer
-// Return 1 if the pattern does not match
-//
-static int nx_check_and_advance(nx_buffer_t         **buffer,
-                                unsigned char       **cursor,
-                                unsigned char        *pattern,
-                                int                   pattern_length,
-                                unsigned char        *expected_tags,
-                                nx_field_location_t  *location)
-{
-    nx_buffer_t   *test_buffer = *buffer;
-    unsigned char *test_cursor = *cursor;
-
-    if (!test_cursor)
-        return 1; // no match
-
-    unsigned char *end_of_buffer = nx_buffer_base(test_buffer) + nx_buffer_size(test_buffer);
-    int idx = 0;
-
-    while (idx < pattern_length && *test_cursor == pattern[idx]) {
-        idx++;
-        test_cursor++;
-        if (test_cursor == end_of_buffer) {
-            test_buffer = test_buffer->next;
-            if (test_buffer == 0)
-                return 1; // Pattern didn't match
-            test_cursor = nx_buffer_base(test_buffer);
-            end_of_buffer = test_cursor + nx_buffer_size(test_buffer);
-        }
-    }
-
-    if (idx < pattern_length)
-        return 1; // Pattern didn't match
-
-    //
-    // Pattern matched, check the tag
-    //
-    while (*expected_tags && *test_cursor != *expected_tags)
-        expected_tags++;
-    if (*expected_tags == 0)
-        return 0;  // Unexpected tag
-
-    if (location->parsed)
-        return 0;  // Duplicate section
-
-    //
-    // Pattern matched and tag is expected.  Mark the beginning of the section.
-    //
-    location->parsed = 1;
-    location->buffer = test_buffer;
-    location->offset = test_cursor - nx_buffer_base(test_buffer);
-    location->length = 0;
-
-    //
-    // Advance the pointers to consume the whole section.
-    //
-    int consume = 0;
-    unsigned char tag = next_octet(&test_cursor, &test_buffer);
-    if (!test_cursor) return 0;
-    switch (tag) {
-    case 0x45 : // list0
-        break;
-
-    case 0xd0 : // list32
-    case 0xd1 : // map32
-    case 0xb0 : // vbin32
-        consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 24;
-        if (!test_cursor) return 0;
-        consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 16;
-        if (!test_cursor) return 0;
-        consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 8;
-        if (!test_cursor) return 0;
-        // Fall through to the next case...
-
-    case 0xc0 : // list8
-    case 0xc1 : // map8
-    case 0xa0 : // vbin8
-        consume |= (int) next_octet(&test_cursor, &test_buffer);
-        if (!test_cursor) return 0;
-        break;
-    }
-
-    if (consume)
-        advance(&test_cursor, &test_buffer, consume);
-
-    *cursor = test_cursor;
-    *buffer = test_buffer;
-    return 1;
-}
-
-
-static void nx_insert(nx_message_t *msg, const uint8_t *seq, size_t len)
-{
-    nx_buffer_t *buf = DEQ_TAIL(msg->buffers);
-
-    while (len > 0) {
-        if (buf == 0 || nx_buffer_capacity(buf) == 0) {
-            buf = nx_allocate_buffer();
-            if (buf == 0)
-                return;
-            DEQ_INSERT_TAIL(msg->buffers, buf);
-        }
-
-        size_t to_copy = nx_buffer_capacity(buf);
-        if (to_copy > len)
-            to_copy = len;
-        memcpy(nx_buffer_cursor(buf), seq, to_copy);
-        nx_buffer_insert(buf, to_copy);
-        len -= to_copy;
-        seq += to_copy;
-        msg->length += to_copy;
-    }
-}
-
-
-static void nx_insert_8(nx_message_t *msg, uint8_t value)
-{
-    nx_insert(msg, &value, 1);
-}
-
-
-static void nx_insert_32(nx_message_t *msg, uint32_t value)
-{
-    uint8_t buf[4];
-    buf[0] = (uint8_t) ((value & 0xFF000000) >> 24);
-    buf[1] = (uint8_t) ((value & 0x00FF0000) >> 16);
-    buf[2] = (uint8_t) ((value & 0x0000FF00) >> 8);
-    buf[3] = (uint8_t)  (value & 0x000000FF);
-    nx_insert(msg, buf, 4);
-}
-
-
-static void nx_insert_64(nx_message_t *msg, uint64_t value)
-{
-    uint8_t buf[8];
-    buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56);
-    buf[1] = (uint8_t) ((value & 0x00FF000000000000L) >> 48);
-    buf[2] = (uint8_t) ((value & 0x0000FF0000000000L) >> 40);
-    buf[3] = (uint8_t) ((value & 0x000000FF00000000L) >> 32);
-    buf[4] = (uint8_t) ((value & 0x00000000FF000000L) >> 24);
-    buf[5] = (uint8_t) ((value & 0x0000000000FF0000L) >> 16);
-    buf[6] = (uint8_t) ((value & 0x000000000000FF00L) >> 8);
-    buf[7] = (uint8_t)  (value & 0x00000000000000FFL);
-    nx_insert(msg, buf, 8);
-}
-
-
-static void nx_overwrite(nx_buffer_t **buf, size_t *cursor, uint8_t value)
-{
-    while (*buf) {
-        if (*cursor >= nx_buffer_size(*buf)) {
-            *buf = (*buf)->next;
-            *cursor = 0;
-        } else {
-            nx_buffer_base(*buf)[*cursor] = value;
-            (*cursor)++;
-            return;
-        }
-    }
-}
-
-
-static void nx_overwrite_32(nx_field_location_t *field, uint32_t value)
-{
-    nx_buffer_t *buf    = field->buffer;
-    size_t       cursor = field->offset;
-
-    nx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24));
-    nx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 24));
-    nx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 24));
-    nx_overwrite(&buf, &cursor, (uint8_t)  (value & 0x000000FF));
-}
-
-
-static void nx_start_list_performative(nx_message_t *msg, uint8_t code)
-{
-    //
-    // Insert the short-form performative tag
-    //
-    nx_insert(msg, (const uint8_t*) "\x00\x53", 2);
-    nx_insert_8(msg, code);
-
-    //
-    // Open the list with a list32 tag
-    //
-    nx_insert_8(msg, 0xd0);
-
-    //
-    // Mark the current location to later overwrite the length
-    //
-    msg->compose_length.buffer = DEQ_TAIL(msg->buffers);
-    msg->compose_length.offset = nx_buffer_size(msg->compose_length.buffer);
-    msg->compose_length.length = 4;
-    msg->compose_length.parsed = 1;
-
-    nx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4);
-
-    //
-    // Mark the current location to later overwrite the count
-    //
-    msg->compose_count.buffer = DEQ_TAIL(msg->buffers);
-    msg->compose_count.offset = nx_buffer_size(msg->compose_count.buffer);
-    msg->compose_count.length = 4;
-    msg->compose_count.parsed = 1;
-
-    nx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4);
-
-    msg->length = 4; // Include the length of the count field
-    msg->count = 0;
-}
-
-
-static void nx_end_list(nx_message_t *msg)
-{
-    nx_overwrite_32(&msg->compose_length, msg->length);
-    nx_overwrite_32(&msg->compose_count,  msg->count);
-}
-
-
-const nx_allocator_config_t *nx_allocator_default_config(void)
-{
-    default_config.buffer_size                     = 1024;
-    default_config.buffer_preallocation_count      = 512;
-    default_config.buffer_rebalancing_batch_count  = 16;
-    default_config.buffer_local_storage_max        = 64;
-    default_config.buffer_free_list_max            = 1000000;
-    default_config.message_allocation_batch_count  = 256;
-    default_config.message_rebalancing_batch_count = 64;
-    default_config.message_local_storage_max       = 256;
-
-    return &default_config;
-}
-
-
-void nx_allocator_initialize(const nx_allocator_config_t *c)
-{
-    config = c;
-
-    // Initialize the fields in the global structure.
-    DEQ_INIT(global.message_free_list);
-    DEQ_INIT(global.buffer_free_list);
-    global.lock = sys_mutex();
-
-    // Pre-allocate buffers according to the configuration
-    int          i;
-    nx_buffer_t *buf;
-
-    for (i = 0; i < config->buffer_preallocation_count; i++) {
-        buf = (nx_buffer_t*) malloc (sizeof(nx_buffer_t) + config->buffer_size);
-        DEQ_ITEM_INIT(buf);
-        DEQ_INSERT_TAIL(global.buffer_free_list, buf);
-    }
-}
-
-
-void nx_allocator_finalize(void)
-{
-    // TODO - Free buffers and messages
-}
-
-
-nx_message_t *nx_allocate_message(void)
-{
-    nx_allocator_t *alloc = nx_get_allocator();
-    nx_message_t   *msg;
-    int             i;
-
-    if (DEQ_SIZE(alloc->message_free_list) == 0) {
-        //
-        // The local free list is empty, rebalance a batch of objects from the global
-        // free list.
-        //
-        sys_mutex_lock(global.lock);
-        if (DEQ_SIZE(global.message_free_list) >= config->message_rebalancing_batch_count) {
-            for (i = 0; i < config->message_rebalancing_batch_count; i++) {
-                msg = DEQ_HEAD(global.message_free_list);
-                DEQ_REMOVE_HEAD(global.message_free_list);
-                DEQ_INSERT_TAIL(alloc->message_free_list, msg);
-            }
-        }
-        sys_mutex_unlock(global.lock);
-    }
-
-    if (DEQ_SIZE(alloc->message_free_list) == 0) {
-        //
-        // The local free list is still empty.  This means there were not enough objects on the
-        // global free list to make up a batch.  Allocate new objects from the heap and store
-        // them in the local free list.
-        //
-        nx_message_t *batch = NEW_ARRAY(nx_message_t, config->message_allocation_batch_count);
-        memset(batch, 0, sizeof(nx_message_t) * config->message_allocation_batch_count);
-        for (i = 0; i < config->message_allocation_batch_count; i++) {
-            DEQ_INSERT_TAIL(alloc->message_free_list, &batch[i]);
-        }
-    }
-
-    //
-    // If the local free list is still empty, we're out of memory.
-    //
-    if (DEQ_SIZE(alloc->message_free_list) == 0)
-        return 0;
-
-    msg = DEQ_HEAD(alloc->message_free_list);
-    DEQ_REMOVE_HEAD(alloc->message_free_list);
-
-    DEQ_INIT(msg->buffers);
-    msg->in_delivery = NULL;
-    msg->out_delivery = NULL;
-    msg->section_message_header.buffer = 0;
-    msg->section_message_header.parsed = 0;
-    msg->section_delivery_annotation.buffer = 0;
-    msg->section_delivery_annotation.parsed = 0;
-    msg->section_message_annotation.buffer = 0;
-    msg->section_message_annotation.parsed = 0;
-    msg->section_message_properties.buffer = 0;
-    msg->section_message_properties.parsed = 0;
-    msg->section_application_properties.buffer = 0;
-    msg->section_application_properties.parsed = 0;
-    msg->section_body.buffer = 0;
-    msg->section_body.parsed = 0;
-    msg->section_footer.buffer = 0;
-    msg->section_footer.parsed = 0;
-    msg->field_user_id.buffer = 0;
-    msg->field_user_id.parsed = 0;
-    msg->field_to.buffer = 0;
-    msg->field_to.parsed = 0;
-    msg->body.buffer = 0;
-    msg->body.parsed = 0;
-    return msg;
-}
+ALLOC_DECLARE(nx_buffer_t);
+ALLOC_DEFINE_CONFIG(nx_buffer_t, sizeof(nx_buffer_t) + NEXUS_BUFFER_SIZE, 0);
 
 
 nx_buffer_t *nx_allocate_buffer(void)
 {
-    nx_allocator_t *alloc = nx_get_allocator();
-    nx_buffer_t    *buf;
-    int             i;
-
-    if (DEQ_SIZE(alloc->buffer_free_list) == 0) {
-        sys_mutex_lock(global.lock);
-        if (DEQ_SIZE(global.buffer_free_list) >= config->buffer_rebalancing_batch_count) {
-            // Rebalance a batch of free descriptors to the local free list.
-            for (i = 0; i < config->buffer_rebalancing_batch_count; i++) {
-                buf = DEQ_HEAD(global.buffer_free_list);
-                DEQ_REMOVE_HEAD(global.buffer_free_list);
-                DEQ_INSERT_TAIL(alloc->buffer_free_list, buf);
-            }
-        }
-        sys_mutex_unlock(global.lock);
-    }
-
-    if (DEQ_SIZE(alloc->buffer_free_list) == 0) {
-        // Allocate a buffer from the heap
-        buf = (nx_buffer_t*) malloc (sizeof(nx_buffer_t) + config->buffer_size);
-        DEQ_ITEM_INIT(buf);
-        DEQ_INSERT_TAIL(alloc->buffer_free_list, buf);
-    }
-
-    if (DEQ_SIZE(alloc->buffer_free_list) == 0)
-        return 0;
-
-    buf = DEQ_HEAD(alloc->buffer_free_list);
-    DEQ_REMOVE_HEAD(alloc->buffer_free_list);
+    nx_buffer_t *buf = new_nx_buffer_t();
 
+    DEQ_ITEM_INIT(buf);
     buf->size = 0;
-
     return buf;
 }
 
 
-void nx_free_message(nx_message_t *msg)
-{
-    nx_allocator_t *alloc = nx_get_allocator();
-
-    // Free any buffers in the message
-    int          i;
-    nx_buffer_t *buf = DEQ_HEAD(msg->buffers);
-    while (buf) {
-        DEQ_REMOVE_HEAD(msg->buffers);
-        nx_free_buffer(buf);
-        buf = DEQ_HEAD(msg->buffers);
-    }
-
-    DEQ_INSERT_TAIL(alloc->message_free_list, msg);
-    if (DEQ_SIZE(alloc->message_free_list) > config->message_local_storage_max) {
-        //
-        // The local free list has exceeded the threshold for local storage.
-        // Rebalance a batch of free objects to the global free list.
-        //
-        sys_mutex_lock(global.lock);
-        for (i = 0; i < config->message_rebalancing_batch_count; i++) {
-            msg = DEQ_HEAD(alloc->message_free_list);
-            DEQ_REMOVE_HEAD(alloc->message_free_list);
-            DEQ_INSERT_TAIL(global.message_free_list, msg);
-        }
-        sys_mutex_unlock(global.lock);
-    }
-}
-
-
 void nx_free_buffer(nx_buffer_t *buf)
 {
-    nx_allocator_t *alloc = nx_get_allocator();
-    int             i;
-
-    DEQ_INSERT_TAIL(alloc->buffer_free_list, buf);
-    if (DEQ_SIZE(alloc->buffer_free_list) > config->buffer_local_storage_max) {
-        // Rebalance a batch of free descriptors to the global free list.
-        sys_mutex_lock(global.lock);
-        for (i = 0; i < config->buffer_rebalancing_batch_count; i++) {
-            buf = DEQ_HEAD(alloc->buffer_free_list);
-            DEQ_REMOVE_HEAD(alloc->buffer_free_list);
-            DEQ_INSERT_TAIL(global.buffer_free_list, buf);
-        }
-        sys_mutex_unlock(global.lock);
-    }
-}
-
-
-nx_message_t *nx_message_receive(pn_delivery_t *delivery)
-{
-    pn_link_t    *link = pn_delivery_link(delivery);
-    nx_message_t *msg  = (nx_message_t*) pn_delivery_get_context(delivery);
-    ssize_t       rc;
-    nx_buffer_t  *buf;
-
-    //
-    // If there is no message associated with the delivery, this is the first time
-    // we've received anything on this delivery.  Allocate a message descriptor and 
-    // link it and the delivery together.
-    //
-    if (!msg) {
-        msg = nx_allocate_message();
-        pn_delivery_set_context(delivery, (void*) msg);
-
-        //
-        // Record the incoming delivery only if it is not settled.  If it is 
-        // settled, there's no need to propagate disposition back to the sender.
-        //
-        if (!pn_delivery_settled(delivery))
-            msg->in_delivery = delivery;
-    }
-
-    //
-    // Get a reference to the tail buffer on the message.  This is the buffer into which
-    // we will store incoming message data.  If there is no buffer in the message, allocate
-    // an empty one and add it to the message.
-    //
-    buf = DEQ_TAIL(msg->buffers);
-    if (!buf) {
-        buf = nx_allocate_buffer();
-        DEQ_INSERT_TAIL(msg->buffers, buf);
-    }
-
-    while (1) {
-        //
-        // Try to receive enough data to fill the remaining space in the tail buffer.
-        //
-        rc = pn_link_recv(link, (char*) nx_buffer_cursor(buf), nx_buffer_capacity(buf));
-
-        //
-        // If we receive PN_EOS, we have come to the end of the message.
-        //
-        if (rc == PN_EOS) {
-            //
-            // If the last buffer in the list is empty, remove it and free it.  This
-            // will only happen if the size of the message content is an exact multiple
-            // of the buffer size.
-            //
-            if (nx_buffer_size(buf) == 0) {
-                DEQ_REMOVE_TAIL(msg->buffers);
-                nx_free_buffer(buf);
-            }
-            return msg;
-        }
-
-        if (rc > 0) {
-            //
-            // We have received a positive number of bytes for the message.  Advance
-            // the cursor in the buffer.
-            //
-            nx_buffer_insert(buf, rc);
-
-            //
-            // If the buffer is full, allocate a new empty buffer and append it to the
-            // tail of the message's list.
-            //
-            if (nx_buffer_capacity(buf) == 0) {
-                buf = nx_allocate_buffer();
-                DEQ_INSERT_TAIL(msg->buffers, buf);
-            }
-        } else
-            //
-            // We received zero bytes, and no PN_EOS.  This means that we've received
-            // all of the data available up to this point, but it does not constitute
-            // the entire message.  We'll be back later to finish it up.
-            //
-            break;
-    }
-
-    return NULL;
-}
-
-
-int nx_message_check(nx_message_t *msg, nx_message_depth_t depth)
-{
-
-#define LONG  10
-#define SHORT 3
-#define MSG_HDR_LONG                  (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70"
-#define MSG_HDR_SHORT                 (unsigned char*) "\x00\x53\x70"
-#define DELIVERY_ANNOTATION_LONG      (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71"
-#define DELIVERY_ANNOTATION_SHORT     (unsigned char*) "\x00\x53\x71"
-#define MESSAGE_ANNOTATION_LONG       (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72"
-#define MESSAGE_ANNOTATION_SHORT      (unsigned char*) "\x00\x53\x72"
-#define MESSAGE_PROPERTIES_LONG       (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73"
-#define MESSAGE_PROPERTIES_SHORT      (unsigned char*) "\x00\x53\x73"
-#define APPLICATION_PROPERTIES_LONG   (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74"
-#define APPLICATION_PROPERTIES_SHORT  (unsigned char*) "\x00\x53\x74"
-#define BODY_DATA_LONG                (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75"
-#define BODY_DATA_SHORT               (unsigned char*) "\x00\x53\x75"
-#define BODY_SEQUENCE_LONG            (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76"
-#define BODY_SEQUENCE_SHORT           (unsigned char*) "\x00\x53\x76"
-#define FOOTER_LONG                   (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78"
-#define FOOTER_SHORT                  (unsigned char*) "\x00\x53\x78"
-#define TAGS_LIST                     (unsigned char*) "\x45\xc0\xd0"
-#define TAGS_MAP                      (unsigned char*) "\xc1\xd1"
-#define TAGS_BINARY                   (unsigned char*) "\xa0\xb0"
-
-    nx_buffer_t   *buffer = DEQ_HEAD(msg->buffers);
-    unsigned char *cursor;
-
-    if (!buffer)
-        return 0; // Invalid - No data in the message
-
-    if (depth == NX_DEPTH_NONE)
-        return 1;
-
-    cursor = nx_buffer_base(buffer);
-
-    //
-    // MESSAGE HEADER
-    //
-    if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG,  LONG,  TAGS_LIST, &msg->section_message_header))
-        return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &msg->section_message_header))
-        return 0;
-
-    if (depth == NX_DEPTH_HEADER)
-        return 1;
-
-    //
-    // DELIVERY ANNOTATION
-    //
-    if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG,  LONG,  TAGS_MAP,  &msg->section_delivery_annotation))
-        return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP,  &msg->section_delivery_annotation))
-        return 0;
-
-    if (depth == NX_DEPTH_DELIVERY_ANNOTATIONS)
-        return 1;
-
-    //
-    // MESSAGE ANNOTATION
-    //
-    if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG,  LONG,  TAGS_MAP,  &msg->section_message_annotation))
-        return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP,  &msg->section_message_annotation))
-        return 0;
-
-    if (depth == NX_DEPTH_MESSAGE_ANNOTATIONS)
-        return 1;
-
-    //
-    // MESSAGE PROPERTIES
-    //
-    if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_PROPERTIES_LONG,  LONG,  TAGS_LIST, &msg->section_message_properties))
-        return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_PROPERTIES_SHORT, SHORT, TAGS_LIST, &msg->section_message_properties))
-        return 0;
-
-    if (depth == NX_DEPTH_MESSAGE_PROPERTIES)
-        return 1;
-
-    //
-    // APPLICATION PROPERTIES
-    //
-    if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG,  LONG,  TAGS_MAP, &msg->section_application_properties))
-        return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &msg->section_application_properties))
-        return 0;
-
-    if (depth == NX_DEPTH_APPLICATION_PROPERTIES)
-        return 1;
-
-    //
-    // BODY  (Note that this function expects a single data section or a single AMQP sequence)
-    //
-    if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG,      LONG,  TAGS_BINARY, &msg->section_body))
-        return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT,     SHORT, TAGS_BINARY, &msg->section_body))
-        return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG,  LONG,  TAGS_LIST,   &msg->section_body))
-        return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST,   &msg->section_body))
-        return 0;
-
-    if (depth == NX_DEPTH_BODY)
-        return 1;
-
-    //
-    // FOOTER
-    //
-    if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_LONG,  LONG,  TAGS_MAP, &msg->section_footer))
-        return 0;
-    if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &msg->section_footer))
-        return 0;
-
-    return 1;
-}
-
-
-nx_field_iterator_t *nx_message_field_to(nx_message_t *msg)
-{
-    while (1) {
-        if (msg->field_to.parsed)
-            return nx_field_iterator_buffer(msg->field_to.buffer, msg->field_to.offset, msg->field_to.length, ITER_VIEW_ALL);
-
-        if (msg->section_message_properties.parsed == 0)
-            break;
-
-        nx_buffer_t   *buffer = msg->section_message_properties.buffer;
-        unsigned char *cursor = nx_buffer_base(buffer) + msg->section_message_properties.offset;
-
-        int count = start_list(&cursor, &buffer);
-        int result;
-
-        if (count < 3)
-            break;
-
-        result = traverse_field(&cursor, &buffer, 0); // message_id
-        if (!result) return 0;
-        result = traverse_field(&cursor, &buffer, 0); // user_id
-        if (!result) return 0;
-        result = traverse_field(&cursor, &buffer, &msg->field_to); // to
-        if (!result) return 0;
-    }
-
-    return 0;
-}
-
-
-nx_field_iterator_t *nx_message_body(nx_message_t *msg)
-{
-    while (1) {
-        if (msg->body.parsed)
-            return nx_field_iterator_buffer(msg->body.buffer, msg->body.offset, msg->body.length, ITER_VIEW_ALL);
-
-        if (msg->section_body.parsed == 0)
-            break;
-
-        nx_buffer_t   *buffer = msg->section_body.buffer;
-        unsigned char *cursor = nx_buffer_base(buffer) + msg->section_body.offset;
-        int result;
-
-        result = traverse_field(&cursor, &buffer, &msg->body);
-        if (!result) return 0;
-    }
-
-    return 0;
-}
-
-
-void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_t *buf_chain)
-{
-    nx_message_begin_header(msg);
-    nx_message_insert_boolean(msg, 0);  // durable
-    //nx_message_insert_null(msg);        // priority
-    //nx_message_insert_null(msg);        // ttl
-    //nx_message_insert_boolean(msg, 0);  // first-acquirer
-    //nx_message_insert_uint(msg, 0);     // delivery-count
-    nx_message_end_header(msg);
-
-    nx_message_begin_message_properties(msg);
-    nx_message_insert_null(msg);          // message-id
-    nx_message_insert_null(msg);          // user-id
-    nx_message_insert_string(msg, to);    // to
-    //nx_message_insert_null(msg);          // subject
-    //nx_message_insert_null(msg);          // reply-to
-    //nx_message_insert_null(msg);          // correlation-id
-    //nx_message_insert_null(msg);          // content-type
-    //nx_message_insert_null(msg);          // content-encoding
-    //nx_message_insert_timestamp(msg, 0);  // absolute-expiry-time
-    //nx_message_insert_timestamp(msg, 0);  // creation-time
-    //nx_message_insert_null(msg);          // group-id
-    //nx_message_insert_uint(msg, 0);       // group-sequence
-    //nx_message_insert_null(msg);          // reply-to-group-id
-    nx_message_end_message_properties(msg);
-
-    if (buf_chain)
-        nx_message_append_body_data(msg, buf_chain);
-}
-
-
-void nx_message_begin_header(nx_message_t *msg)
-{
-    nx_start_list_performative(msg, 0x70);
-}
-
-
-void nx_message_end_header(nx_message_t *msg)
-{
-    nx_end_list(msg);
-}
-
-
-void nx_message_begin_delivery_annotations(nx_message_t *msg)
-{
-    assert(0); // Not Implemented
-}
-
-
-void nx_message_end_delivery_annotations(nx_message_t *msg)
-{
-    assert(0); // Not Implemented
-}
-
-
-void nx_message_begin_message_annotations(nx_message_t *msg)
-{
-    assert(0); // Not Implemented
-}
-
-
-void nx_message_end_message_annotations(nx_message_t *msg)
-{
-    assert(0); // Not Implemented
-}
-
-
-void nx_message_begin_message_properties(nx_message_t *msg)
-{
-    nx_start_list_performative(msg, 0x73);
-}
-
-
-void nx_message_end_message_properties(nx_message_t *msg)
-{
-    nx_end_list(msg);
-}
-
-
-void nx_message_begin_application_properties(nx_message_t *msg)
-{
-    assert(0); // Not Implemented
-}
-
-
-void nx_message_end_application_properties(nx_message_t *msg)
-{
-    assert(0); // Not Implemented
-}
-
-
-void nx_message_append_body_data(nx_message_t *msg, nx_buffer_t *buf_chain)
-{
-    uint32_t     len   = 0;
-    nx_buffer_t *buf   = buf_chain;
-    nx_buffer_t *last  = 0;
-    size_t       count = 0;
-
-    while (buf) {
-        len += nx_buffer_size(buf);
-        count++;
-        last = buf;
-        buf = DEQ_NEXT(buf);
-    }
-
-    nx_insert(msg, (const uint8_t*) "\x00\x53\x75", 3);
-    if (len < 256) {
-        nx_insert_8(msg, 0xa0);  // vbin8
-        nx_insert_8(msg, (uint8_t) len);
-    } else {
-        nx_insert_8(msg, 0xb0);  // vbin32
-        nx_insert_32(msg, len);
-    }
-
-    if (len > 0) {
-        buf_chain->prev         = msg->buffers.tail;
-        msg->buffers.tail->next = buf_chain;
-        msg->buffers.tail       = last;
-        msg->buffers.size      += count;
-    }
-}
-
-
-void nx_message_begin_body_sequence(nx_message_t *msg)
-{
-}
-
-
-void nx_message_end_body_sequence(nx_message_t *msg)
-{
-}
-
-
-void nx_message_begin_footer(nx_message_t *msg)
-{
-    assert(0); // Not Implemented
-}
-
-
-void nx_message_end_footer(nx_message_t *msg)
-{
-    assert(0); // Not Implemented
-}
-
-
-void nx_message_insert_null(nx_message_t *msg)
-{
-    nx_insert_8(msg, 0x40);
-    msg->count++;
-}
-
-
-void nx_message_insert_boolean(nx_message_t *msg, int value)
-{
-    if (value)
-        nx_insert(msg, (const uint8_t*) "\x56\x01", 2);
-    else
-        nx_insert(msg, (const uint8_t*) "\x56\x00", 2);
-    msg->count++;
-}
-
-
-void nx_message_insert_ubyte(nx_message_t *msg, uint8_t value)
-{
-    nx_insert_8(msg, 0x50);
-    nx_insert_8(msg, value);
-    msg->count++;
-}
-
-
-void nx_message_insert_uint(nx_message_t *msg, uint32_t value)
-{
-    if (value == 0) {
-        nx_insert_8(msg, 0x43);  // uint0
-    } else if (value < 256) {
-        nx_insert_8(msg, 0x52);  // smalluint
-        nx_insert_8(msg, (uint8_t) value);
-    } else {
-        nx_insert_8(msg, 0x70);  // uint
-        nx_insert_32(msg, value);
-    }
-    msg->count++;
-}
-
-
-void nx_message_insert_ulong(nx_message_t *msg, uint64_t value)
-{
-    if (value == 0) {
-        nx_insert_8(msg, 0x44);  // ulong0
-    } else if (value < 256) {
-        nx_insert_8(msg, 0x53);  // smallulong
-        nx_insert_8(msg, (uint8_t) value);
-    } else {
-        nx_insert_8(msg, 0x80);  // ulong
-        nx_insert_64(msg, value);
-    }
-    msg->count++;
-}
-
-
-void nx_message_insert_binary(nx_message_t *msg, const uint8_t *start, size_t len)
-{
-    if (len < 256) {
-        nx_insert_8(msg, 0xa0);  // vbin8
-        nx_insert_8(msg, (uint8_t) len);
-    } else {
-        nx_insert_8(msg, 0xb0);  // vbin32
-        nx_insert_32(msg, len);
-    }
-    nx_insert(msg, start, len);
-    msg->count++;
-}
-
-
-void nx_message_insert_string(nx_message_t *msg, const char *start)
-{
-    uint32_t len = strlen(start);
-
-    if (len < 256) {
-        nx_insert_8(msg, 0xa1);  // str8-utf8
-        nx_insert_8(msg, (uint8_t) len);
-        nx_insert(msg, (const uint8_t*) start, len);
-    } else {
-        nx_insert_8(msg, 0xb1);  // str32-utf8
-        nx_insert_32(msg, len);
-        nx_insert(msg, (const uint8_t*) start, len);
-    }
-    msg->count++;
-}
-
-
-void nx_message_insert_uuid(nx_message_t *msg, const uint8_t *value)
-{
-    nx_insert_8(msg, 0x98);  // uuid
-    nx_insert(msg, value, 16);
-    msg->count++;
-}
-
-
-void nx_message_insert_symbol(nx_message_t *msg, const char *start, size_t len)
-{
-    if (len < 256) {
-        nx_insert_8(msg, 0xa3);  // sym8
-        nx_insert_8(msg, (uint8_t) len);
-        nx_insert(msg, (const uint8_t*) start, len);
-    } else {
-        nx_insert_8(msg, 0xb3);  // sym32
-        nx_insert_32(msg, len);
-        nx_insert(msg, (const uint8_t*) start, len);
-    }
-    msg->count++;
-}
-
-
-void nx_message_insert_timestamp(nx_message_t *msg, uint64_t value)
-{
-    nx_insert_8(msg, 0x83);  // timestamp
-    nx_insert_64(msg, value);
-    msg->count++;
+    free_nx_buffer_t(buf);
 }
 
 
@@ -1146,7 +58,7 @@ unsigned char *nx_buffer_cursor(nx_buffe
 
 size_t nx_buffer_capacity(nx_buffer_t *buf)
 {
-    return config->buffer_size - buf->size;
+    return NEXUS_BUFFER_SIZE - buf->size;
 }
 
 
@@ -1159,6 +71,6 @@ size_t nx_buffer_size(nx_buffer_t *buf)
 void nx_buffer_insert(nx_buffer_t *buf, size_t len)
 {
     buf->size += len;
-    assert(buf->size <= config->buffer_size);
+    assert(buf->size <= NEXUS_BUFFER_SIZE);
 }
 

Modified: qpid/trunk/qpid/extras/nexus/src/container.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/src/container.c?rev=1442413&r1=1442412&r2=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/src/container.c (original)
+++ qpid/trunk/qpid/extras/nexus/src/container.c Mon Feb  4 22:46:32 2013
@@ -399,10 +399,6 @@ void nx_container_initialize(void)
 {
     nx_log(module, LOG_TRACE, "Container Initializing");
 
-    // TODO - move allocator init to server?
-    const nx_allocator_config_t *alloc_config = nx_allocator_default_config();
-    nx_allocator_initialize(alloc_config);
-
     node_type_map = hash(6,  4, 1);  // 64 buckets, item batches of 4
     node_map      = hash(10, 32, 0); // 1K buckets, item batches of 32
     lock          = sys_mutex();

Modified: qpid/trunk/qpid/extras/nexus/src/iterator.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/nexus/src/iterator.c?rev=1442413&r1=1442412&r2=1442413&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/nexus/src/iterator.c (original)
+++ qpid/trunk/qpid/extras/nexus/src/iterator.c Mon Feb  4 22:46:32 2013
@@ -18,9 +18,9 @@
  */
 
 #include <qpid/nexus/iterator.h>
-#include <qpid/nexus/message.h>
 #include <qpid/nexus/ctools.h>
 #include <qpid/nexus/alloc.h>
+#include "message_private.h"
 #include <stdio.h>
 #include <string.h>
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message