qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject svn commit: r1480445 - in /qpid/proton/trunk: proton-c/src/messenger/messenger.c proton-c/src/messenger/store.c proton-c/src/messenger/store.h tests/python/proton_tests/messenger.py
Date Wed, 08 May 2013 20:39:43 GMT
Author: rhs
Date: Wed May  8 20:39:43 2013
New Revision: 1480445

URL: http://svn.apache.org/r1480445
Log:
PROTON-295: decoupled tracking of store entries from put/get of store entries, fixed tracking
of incoming entries to start when they are returned via get rather than when they are read
off of the wire

Modified:
    qpid/proton/trunk/proton-c/src/messenger/messenger.c
    qpid/proton/trunk/proton-c/src/messenger/store.c
    qpid/proton/trunk/proton-c/src/messenger/store.h
    qpid/proton/trunk/tests/python/proton_tests/messenger.py

Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1480445&r1=1480444&r2=1480445&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Wed May  8 20:39:43 2013
@@ -1182,7 +1182,7 @@ int pn_messenger_put(pn_messenger_t *mes
   if (!entry)
     return pn_error_format(messenger->error, PN_ERR, "store error");
 
-  messenger->outgoing_tracker = pn_tracker(OUTGOING, pni_entry_tracker(entry));
+  messenger->outgoing_tracker = pn_tracker(OUTGOING, pni_entry_track(entry));
   pn_buffer_t *buf = pni_entry_bytes(entry);
 
   while (true) {
@@ -1226,7 +1226,7 @@ pni_store_t *pn_tracker_store(pn_messeng
 pn_status_t pn_messenger_status(pn_messenger_t *messenger, pn_tracker_t tracker)
 {
   pni_store_t *store = pn_tracker_store(messenger, tracker);
-  pni_entry_t *e = pni_store_track(store, pn_tracker_sequence(tracker));
+  pni_entry_t *e = pni_store_entry(store, pn_tracker_sequence(tracker));
   if (e) {
     return pni_entry_get_status(e);
   } else {
@@ -1346,7 +1346,7 @@ int pn_messenger_get(pn_messenger_t *mes
   // XXX: need to drain credit before returning EOS
   if (!entry) return PN_EOS;
 
-  messenger->incoming_tracker = pn_tracker(INCOMING, pni_entry_tracker(entry));
+  messenger->incoming_tracker = pn_tracker(INCOMING, pni_entry_track(entry));
   pn_buffer_t *buf = pni_entry_bytes(entry);
   pn_bytes_t bytes = pn_buffer_bytes(buf);
   const char *encoded = bytes.start;

Modified: qpid/proton/trunk/proton-c/src/messenger/store.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/store.c?rev=1480445&r1=1480444&r2=1480445&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/store.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/store.c Wed May  8 20:39:43 2013
@@ -21,6 +21,7 @@
 
 #include <proton/messenger.h>
 #include <proton/engine.h>
+#include <proton/object.h>
 #include <assert.h>
 #ifndef __cplusplus
 #include <stdbool.h>
@@ -32,20 +33,15 @@
 
 typedef struct pni_stream_t pni_stream_t;
 
-typedef struct {
-  size_t capacity;
-  int window;
-  pn_sequence_t lwm;
-  pn_sequence_t hwm;
-  pni_entry_t **entries;
-} pni_queue_t;
-
 struct pni_store_t {
   size_t size;
-  pni_queue_t queue;
   pni_stream_t *streams;
   pni_entry_t *store_head;
   pni_entry_t *store_tail;
+  int window;
+  pn_sequence_t lwm;
+  pn_sequence_t hwm;
+  pn_hash_t *tracked;
 };
 
 struct pni_stream_t {
@@ -57,7 +53,6 @@ struct pni_stream_t {
 };
 
 struct pni_entry_t {
-  int refcount;
   pn_sequence_t id;
   pni_stream_t *stream;
   bool free;
@@ -71,169 +66,15 @@ struct pni_entry_t {
   void *context;
 };
 
-static void pni_entry_incref(pni_entry_t *entry)
-{
-  entry->refcount++;
-}
-
-void pni_entry_reclaim(pni_entry_t *entry)
+void pni_entry_finalize(void *object)
 {
+  pni_entry_t *entry = (pni_entry_t *) object;
   assert(entry->free);
   pn_delivery_t *d = entry->delivery;
   if (d) {
-    if (!pn_delivery_local_state(d)) {
-      pn_delivery_update(d, PN_ACCEPTED);
-    }
     pn_delivery_settle(d);
     pni_entry_set_delivery(entry, NULL);
   }
-  free(entry);
-}
-
-static void pni_entry_decref(pni_entry_t *entry)
-{
-  if (entry) {
-    assert(entry->refcount > 0);
-    entry->refcount--;
-    if (entry->refcount == 0) {
-      pni_entry_reclaim(entry);
-    }
-  }
-}
-
-void pni_queue_init(pni_queue_t *queue)
-{
-  queue->capacity = 1024;
-  queue->window = 0;
-  queue->lwm = 0;
-  queue->hwm = 0;
-  queue->entries = (pni_entry_t **) calloc(queue->capacity, sizeof(pni_entry_t *));
-}
-
-void pni_queue_tini(pni_queue_t *queue)
-{
-  for (int i = 0; i < queue->hwm - queue->lwm; i++) {
-    pni_entry_decref(queue->entries[i]);
-  }
-  free(queue->entries);
-}
-
-bool pni_queue_contains(pni_queue_t *queue, pn_sequence_t id)
-{
-  return (id - queue->lwm >= 0) && (queue->hwm - id > 0);
-}
-
-pni_entry_t *pni_queue_get(pni_queue_t *queue, pn_sequence_t id)
-{
-  if (pni_queue_contains(queue, id)) {
-    size_t offset = id - queue->lwm;
-    assert(offset >= 0 && offset < queue->capacity);
-    return queue->entries[offset];
-  } else {
-    return NULL;
-  }
-}
-
-void pni_queue_gc(pni_queue_t *queue)
-{
-  size_t count = queue->hwm - queue->lwm;
-  size_t delta = 0;
-
-  while (delta < count && !queue->entries[delta]) {
-    delta++;
-  }
-
-  memmove(queue->entries, queue->entries + delta, (count - delta)*sizeof(pni_entry_t
*));
-  queue->lwm += delta;
-}
-
-void pni_queue_del(pni_queue_t *queue, pni_entry_t *entry)
-{
-  pn_sequence_t id = entry->id;
-  if (pni_queue_contains(queue, id)) {
-    size_t offset = id - queue->lwm;
-    assert(offset >= 0 && offset < queue->capacity);
-    queue->entries[offset] = NULL;
-    pni_entry_decref(entry);
-  }
-}
-
-void pni_queue_slide(pni_queue_t *queue)
-{
-  if (queue->window >= 0) {
-    while (queue->hwm - queue->lwm > queue->window) {
-      pni_entry_t *e = pni_queue_get(queue, queue->lwm);
-      if (e) {
-        pni_queue_del(queue, e);
-      } else {
-        pni_queue_gc(queue);
-      }
-    }
-  }
-  pni_queue_gc(queue);
-}
-
-pn_sequence_t pni_queue_add(pni_queue_t *queue, pni_entry_t *entry)
-{
-  pn_sequence_t id = queue->hwm++;
-  entry->id = id;
-  size_t offset = id - queue->lwm;
-  PN_ENSUREZ(queue->entries, queue->capacity, offset + 1, pni_entry_t *);
-  assert(offset >= 0 && offset < queue->capacity);
-  queue->entries[offset] = entry;
-  pni_entry_incref(entry);
-  pni_queue_slide(queue);
-  return id;
-}
-
-int pni_queue_update(pni_queue_t *queue, pn_sequence_t id, pn_status_t status,
-                    int flags, bool settle, bool match)
-{
-  if (!pni_queue_contains(queue, id)) {
-    return 0;
-  }
-
-  size_t start;
-  if (PN_CUMULATIVE & flags) {
-    start = queue->lwm;
-  } else {
-    start = id;
-  }
-
-  for (pn_sequence_t i = start; i <= id; i++) {
-    pni_entry_t *e = pni_queue_get(queue, i);
-    if (e) {
-      pn_delivery_t *d = e->delivery;
-      if (d) {
-        if (!pn_delivery_local_state(d)) {
-          if (match) {
-            pn_delivery_update(d, pn_delivery_remote_state(d));
-          } else {
-            switch (status) {
-            case PN_STATUS_ACCEPTED:
-              pn_delivery_update(d, PN_ACCEPTED);
-              break;
-            case PN_STATUS_REJECTED:
-              pn_delivery_update(d, PN_REJECTED);
-              break;
-            default:
-              break;
-            }
-          }
-        }
-      }
-      if (settle) {
-        if (d) {
-          pn_delivery_settle(d);
-        }
-        pni_queue_del(queue, e);
-      }
-    }
-  }
-
-  pni_queue_gc(queue);
-
-  return 0;
 }
 
 pni_store_t *pni_store()
@@ -245,7 +86,10 @@ pni_store_t *pni_store()
   store->streams = NULL;
   store->store_head = NULL;
   store->store_tail = NULL;
-  pni_queue_init(&store->queue);
+  store->window = 0;
+  store->lwm = 0;
+  store->hwm = 0;
+  store->tracked = pn_hash(0, 0.75, PN_REFCOUNT);
 
   return store;
 }
@@ -314,7 +158,7 @@ void pni_entry_free(pni_entry_t *entry)
 
   pn_buffer_free(entry->bytes);
   entry->bytes = NULL;
-  pni_entry_decref(entry);
+  pn_decref(entry);
   store->size--;
 }
 
@@ -331,13 +175,13 @@ void pni_stream_free(pni_stream_t *strea
 void pni_store_free(pni_store_t *store)
 {
   if (!store) return;
+  pn_free(store->tracked);
   pni_stream_t *stream = store->streams;
   while (stream) {
     pni_stream_t *next = stream->next;
     pni_stream_free(stream);
     stream = next;
   }
-  pni_queue_tini(&store->queue);
   free(store);
 }
 
@@ -356,12 +200,13 @@ pni_stream_t *pni_stream_get(pni_store_t
 pni_entry_t *pni_store_put(pni_store_t *store, const char *address)
 {
   assert(store);
+  static pn_class_t clazz = {pni_entry_finalize};
+
   if (!address) address = "";
   pni_stream_t *stream = pni_stream_put(store, address);
   if (!stream) return NULL;
-  pni_entry_t *entry = (pni_entry_t *) malloc(sizeof(pni_entry_t));
+  pni_entry_t *entry = (pni_entry_t *) pn_new(sizeof(pni_entry_t), &clazz);
   if (!entry) return NULL;
-  entry->refcount = 0;
   entry->stream = stream;
   entry->free = false;
   entry->stream_next = NULL;
@@ -373,10 +218,6 @@ pni_entry_t *pni_store_put(pni_store_t *
   LL_ADD(stream, stream, entry);
   LL_ADD(store, store, entry);
   store->size++;
-
-  pni_entry_incref(entry);
-  pni_queue_add(&store->queue, entry);
-
   return entry;
 }
 
@@ -443,7 +284,7 @@ void *pni_entry_get_context(pni_entry_t 
 
 static pn_status_t disp2status(pn_disposition_t disp)
 {
-  if (!disp) return PN_STATUS_UNKNOWN;
+  if (!disp) return PN_STATUS_PENDING;
 
   switch (disp) {
   case PN_ACCEPTED:
@@ -463,42 +304,119 @@ void pni_entry_updated(pni_entry_t *entr
   assert(entry);
   pn_delivery_t *d = entry->delivery;
   if (d) {
-    if (pn_delivery_remote_state(d))
+    if (pn_delivery_remote_state(d)) {
       entry->status = disp2status(pn_delivery_remote_state(d));
-    else if (pn_delivery_settled(d))
+    } else if (pn_delivery_settled(d)) {
       entry->status = disp2status(pn_delivery_local_state(d));
-    else
+    } else {
       entry->status = PN_STATUS_PENDING;
+    }
   }
 }
 
-pn_sequence_t pni_entry_tracker(pni_entry_t *entry)
+pn_sequence_t pni_entry_id(pni_entry_t *entry)
 {
   assert(entry);
   return entry->id;
 }
 
-pni_entry_t *pni_store_track(pni_store_t *store, pn_sequence_t id)
+pni_entry_t *pni_store_entry(pni_store_t *store, pn_sequence_t id)
 {
   assert(store);
-  return pni_queue_get(&store->queue, id);
+  return (pni_entry_t *) pn_hash_get(store->tracked, id);
+}
+
+bool pni_store_tracking(pni_store_t *store, pn_sequence_t id)
+{
+  return (id - store->lwm >= 0) && (store->hwm - id > 0);
+}
+
+pn_sequence_t pni_entry_track(pni_entry_t *entry)
+{
+  assert(entry);
+
+  pni_store_t *store = entry->stream->store;
+  entry->id = store->hwm++;
+  pn_hash_put(store->tracked, entry->id, entry);
+
+  if (store->window >= 0) {
+    while (store->hwm - store->lwm > store->window) {
+      pni_entry_t *e = pni_store_entry(store, store->lwm);
+      if (e) {
+        pn_hash_del(store->tracked, store->lwm);
+      }
+      store->lwm++;
+    }
+  }
+
+  return entry->id;
 }
 
 int pni_store_update(pni_store_t *store, pn_sequence_t id, pn_status_t status,
                      int flags, bool settle, bool match)
 {
   assert(store);
-  return pni_queue_update(&store->queue, id, status, flags, settle, match);
+
+  if (!pni_store_tracking(store, id)) {
+    return 0;
+  }
+
+  size_t start;
+  if (PN_CUMULATIVE & flags) {
+    start = store->lwm;
+  } else {
+    start = id;
+  }
+
+  for (pn_sequence_t i = start; i <= id; i++) {
+    pni_entry_t *e = pni_store_entry(store, i);
+    if (e) {
+      pn_delivery_t *d = e->delivery;
+      if (d) {
+        if (!pn_delivery_local_state(d)) {
+          if (match) {
+            pn_delivery_update(d, pn_delivery_remote_state(d));
+          } else {
+            switch (status) {
+            case PN_STATUS_ACCEPTED:
+              pn_delivery_update(d, PN_ACCEPTED);
+              break;
+            case PN_STATUS_REJECTED:
+              pn_delivery_update(d, PN_REJECTED);
+              break;
+            default:
+              break;
+            }
+          }
+
+          pni_entry_updated(e);
+        }
+      }
+      if (settle) {
+        if (d) {
+          pn_delivery_settle(d);
+        }
+        pn_hash_del(store->tracked, e->id);
+      }
+    }
+  }
+
+  while (store->hwm - store->lwm > 0 &&
+         !pn_hash_get(store->tracked, store->lwm)) {
+    store->lwm++;
+  }
+
+  return 0;
 }
 
 int pni_store_get_window(pni_store_t *store)
 {
   assert(store);
-  return store->queue.window;
+  return store->window;
 }
 
 void pni_store_set_window(pni_store_t *store, int window)
 {
   assert(store);
-  store->queue.window = window;
+  store->window = window;
 }

Modified: qpid/proton/trunk/proton-c/src/messenger/store.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/store.h?rev=1480445&r1=1480444&r2=1480445&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/store.h (original)
+++ qpid/proton/trunk/proton-c/src/messenger/store.h Wed May  8 20:39:43 2013
@@ -43,8 +43,8 @@ void *pni_entry_get_context(pni_entry_t 
 void pni_entry_updated(pni_entry_t *entry);
 void pni_entry_free(pni_entry_t *entry);
 
-pn_sequence_t pni_entry_tracker(pni_entry_t *entry);
-pni_entry_t *pni_store_track(pni_store_t *store, pn_sequence_t id);
+pn_sequence_t pni_entry_track(pni_entry_t *entry);
+pni_entry_t *pni_store_entry(pni_store_t *store, pn_sequence_t id);
 int pni_store_update(pni_store_t *store, pn_sequence_t id, pn_status_t status,
                      int flags, bool settle, bool match);
 int pni_store_get_window(pni_store_t *store);

Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1480445&r1=1480444&r2=1480445&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Wed May  8 20:39:43 2013
@@ -265,6 +265,39 @@ class MessengerTest(Test):
     for t in trackers:
       assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
 
+  def testIncomingQueueBiggerThanWindow(self):
+    self.server.outgoing_window = 10
+    self.client.incoming_window = 10
+    self.start()
+
+    msg = Message()
+    msg.address = "amqp://0.0.0.0:12345"
+    msg.subject = "Hello World!"
+
+    for i in range(20):
+      self.client.put(msg)
+
+    while self.client.incoming < 20:
+      self.client.recv(20 - self.client.incoming)
+
+    trackers = []
+    while self.client.incoming:
+      t = self.client.get(msg)
+      assert self.client.status(t) is PENDING, (t, self.client.status(t))
+      trackers.append(t)
+
+    for t in trackers[:10]:
+      assert self.client.status(t) is None, (t, self.client.status(t))
+    for t in trackers[10:]:
+      assert self.client.status(t) is PENDING, (t, self.client.status(t))
+
+    self.client.accept()
+
+    for t in trackers[:10]:
+      assert self.client.status(t) is None, (t, self.client.status(t))
+    for t in trackers[10:]:
+      assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
+
   def test_proton222(self):
     self.start()
     msg = Message()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message