Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3239EFC9C for ; Wed, 8 May 2013 20:40:10 +0000 (UTC) Received: (qmail 50765 invoked by uid 500); 8 May 2013 20:40:10 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 50704 invoked by uid 500); 8 May 2013 20:40:10 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 50697 invoked by uid 99); 8 May 2013 20:40:10 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 May 2013 20:40:10 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 May 2013 20:40:06 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 116F22388962; Wed, 8 May 2013 20:39:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1480445 - in /qpid/proton/trunk: proton-c/src/messenger/messenger.c proton-c/src/messenger/store.c proton-c/src/messenger/store.h tests/python/proton_tests/messenger.py Date: Wed, 08 May 2013 20:39:43 -0000 To: commits@qpid.apache.org From: rhs@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130508203944.116F22388962@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rhs Date: Wed May 8 20:39:43 2013 New Revision: 1480445 URL: http://svn.apache.org/r1480445 Log: PROTON-295: decoupled tracking of store entries from put/get of store entries, fixed tracking of incoming entries to start when they are returned via get rather than when they are read off of the wire Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c qpid/proton/trunk/proton-c/src/messenger/store.c qpid/proton/trunk/proton-c/src/messenger/store.h qpid/proton/trunk/tests/python/proton_tests/messenger.py Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1480445&r1=1480444&r2=1480445&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original) +++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Wed May 8 20:39:43 2013 @@ -1182,7 +1182,7 @@ int pn_messenger_put(pn_messenger_t *mes if (!entry) return pn_error_format(messenger->error, PN_ERR, "store error"); - messenger->outgoing_tracker = pn_tracker(OUTGOING, pni_entry_tracker(entry)); + messenger->outgoing_tracker = pn_tracker(OUTGOING, pni_entry_track(entry)); pn_buffer_t *buf = pni_entry_bytes(entry); while (true) { @@ -1226,7 +1226,7 @@ pni_store_t *pn_tracker_store(pn_messeng pn_status_t pn_messenger_status(pn_messenger_t *messenger, pn_tracker_t tracker) { pni_store_t *store = pn_tracker_store(messenger, tracker); - pni_entry_t *e = pni_store_track(store, pn_tracker_sequence(tracker)); + pni_entry_t *e = pni_store_entry(store, pn_tracker_sequence(tracker)); if (e) { return pni_entry_get_status(e); } else { @@ -1346,7 +1346,7 @@ int pn_messenger_get(pn_messenger_t *mes // XXX: need to drain credit before returning EOS if (!entry) return PN_EOS; - messenger->incoming_tracker = pn_tracker(INCOMING, pni_entry_tracker(entry)); + messenger->incoming_tracker = pn_tracker(INCOMING, pni_entry_track(entry)); pn_buffer_t *buf = pni_entry_bytes(entry); pn_bytes_t bytes = pn_buffer_bytes(buf); const char *encoded = bytes.start; Modified: qpid/proton/trunk/proton-c/src/messenger/store.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/store.c?rev=1480445&r1=1480444&r2=1480445&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/messenger/store.c (original) +++ qpid/proton/trunk/proton-c/src/messenger/store.c Wed May 8 20:39:43 2013 @@ -21,6 +21,7 @@ #include #include +#include #include #ifndef __cplusplus #include @@ -32,20 +33,15 @@ typedef struct pni_stream_t pni_stream_t; -typedef struct { - size_t capacity; - int window; - pn_sequence_t lwm; - pn_sequence_t hwm; - pni_entry_t **entries; -} pni_queue_t; - struct pni_store_t { size_t size; - pni_queue_t queue; pni_stream_t *streams; pni_entry_t *store_head; pni_entry_t *store_tail; + int window; + pn_sequence_t lwm; + pn_sequence_t hwm; + pn_hash_t *tracked; }; struct pni_stream_t { @@ -57,7 +53,6 @@ struct pni_stream_t { }; struct pni_entry_t { - int refcount; pn_sequence_t id; pni_stream_t *stream; bool free; @@ -71,169 +66,15 @@ struct pni_entry_t { void *context; }; -static void pni_entry_incref(pni_entry_t *entry) -{ - entry->refcount++; -} - -void pni_entry_reclaim(pni_entry_t *entry) +void pni_entry_finalize(void *object) { + pni_entry_t *entry = (pni_entry_t *) object; assert(entry->free); pn_delivery_t *d = entry->delivery; if (d) { - if (!pn_delivery_local_state(d)) { - pn_delivery_update(d, PN_ACCEPTED); - } pn_delivery_settle(d); pni_entry_set_delivery(entry, NULL); } - free(entry); -} - -static void pni_entry_decref(pni_entry_t *entry) -{ - if (entry) { - assert(entry->refcount > 0); - entry->refcount--; - if (entry->refcount == 0) { - pni_entry_reclaim(entry); - } - } -} - -void pni_queue_init(pni_queue_t *queue) -{ - queue->capacity = 1024; - queue->window = 0; - queue->lwm = 0; - queue->hwm = 0; - queue->entries = (pni_entry_t **) calloc(queue->capacity, sizeof(pni_entry_t *)); -} - -void pni_queue_tini(pni_queue_t *queue) -{ - for (int i = 0; i < queue->hwm - queue->lwm; i++) { - pni_entry_decref(queue->entries[i]); - } - free(queue->entries); -} - -bool pni_queue_contains(pni_queue_t *queue, pn_sequence_t id) -{ - return (id - queue->lwm >= 0) && (queue->hwm - id > 0); -} - -pni_entry_t *pni_queue_get(pni_queue_t *queue, pn_sequence_t id) -{ - if (pni_queue_contains(queue, id)) { - size_t offset = id - queue->lwm; - assert(offset >= 0 && offset < queue->capacity); - return queue->entries[offset]; - } else { - return NULL; - } -} - -void pni_queue_gc(pni_queue_t *queue) -{ - size_t count = queue->hwm - queue->lwm; - size_t delta = 0; - - while (delta < count && !queue->entries[delta]) { - delta++; - } - - memmove(queue->entries, queue->entries + delta, (count - delta)*sizeof(pni_entry_t *)); - queue->lwm += delta; -} - -void pni_queue_del(pni_queue_t *queue, pni_entry_t *entry) -{ - pn_sequence_t id = entry->id; - if (pni_queue_contains(queue, id)) { - size_t offset = id - queue->lwm; - assert(offset >= 0 && offset < queue->capacity); - queue->entries[offset] = NULL; - pni_entry_decref(entry); - } -} - -void pni_queue_slide(pni_queue_t *queue) -{ - if (queue->window >= 0) { - while (queue->hwm - queue->lwm > queue->window) { - pni_entry_t *e = pni_queue_get(queue, queue->lwm); - if (e) { - pni_queue_del(queue, e); - } else { - pni_queue_gc(queue); - } - } - } - pni_queue_gc(queue); -} - -pn_sequence_t pni_queue_add(pni_queue_t *queue, pni_entry_t *entry) -{ - pn_sequence_t id = queue->hwm++; - entry->id = id; - size_t offset = id - queue->lwm; - PN_ENSUREZ(queue->entries, queue->capacity, offset + 1, pni_entry_t *); - assert(offset >= 0 && offset < queue->capacity); - queue->entries[offset] = entry; - pni_entry_incref(entry); - pni_queue_slide(queue); - return id; -} - -int pni_queue_update(pni_queue_t *queue, pn_sequence_t id, pn_status_t status, - int flags, bool settle, bool match) -{ - if (!pni_queue_contains(queue, id)) { - return 0; - } - - size_t start; - if (PN_CUMULATIVE & flags) { - start = queue->lwm; - } else { - start = id; - } - - for (pn_sequence_t i = start; i <= id; i++) { - pni_entry_t *e = pni_queue_get(queue, i); - if (e) { - pn_delivery_t *d = e->delivery; - if (d) { - if (!pn_delivery_local_state(d)) { - if (match) { - pn_delivery_update(d, pn_delivery_remote_state(d)); - } else { - switch (status) { - case PN_STATUS_ACCEPTED: - pn_delivery_update(d, PN_ACCEPTED); - break; - case PN_STATUS_REJECTED: - pn_delivery_update(d, PN_REJECTED); - break; - default: - break; - } - } - } - } - if (settle) { - if (d) { - pn_delivery_settle(d); - } - pni_queue_del(queue, e); - } - } - } - - pni_queue_gc(queue); - - return 0; } pni_store_t *pni_store() @@ -245,7 +86,10 @@ pni_store_t *pni_store() store->streams = NULL; store->store_head = NULL; store->store_tail = NULL; - pni_queue_init(&store->queue); + store->window = 0; + store->lwm = 0; + store->hwm = 0; + store->tracked = pn_hash(0, 0.75, PN_REFCOUNT); return store; } @@ -314,7 +158,7 @@ void pni_entry_free(pni_entry_t *entry) pn_buffer_free(entry->bytes); entry->bytes = NULL; - pni_entry_decref(entry); + pn_decref(entry); store->size--; } @@ -331,13 +175,13 @@ void pni_stream_free(pni_stream_t *strea void pni_store_free(pni_store_t *store) { if (!store) return; + pn_free(store->tracked); pni_stream_t *stream = store->streams; while (stream) { pni_stream_t *next = stream->next; pni_stream_free(stream); stream = next; } - pni_queue_tini(&store->queue); free(store); } @@ -356,12 +200,13 @@ pni_stream_t *pni_stream_get(pni_store_t pni_entry_t *pni_store_put(pni_store_t *store, const char *address) { assert(store); + static pn_class_t clazz = {pni_entry_finalize}; + if (!address) address = ""; pni_stream_t *stream = pni_stream_put(store, address); if (!stream) return NULL; - pni_entry_t *entry = (pni_entry_t *) malloc(sizeof(pni_entry_t)); + pni_entry_t *entry = (pni_entry_t *) pn_new(sizeof(pni_entry_t), &clazz); if (!entry) return NULL; - entry->refcount = 0; entry->stream = stream; entry->free = false; entry->stream_next = NULL; @@ -373,10 +218,6 @@ pni_entry_t *pni_store_put(pni_store_t * LL_ADD(stream, stream, entry); LL_ADD(store, store, entry); store->size++; - - pni_entry_incref(entry); - pni_queue_add(&store->queue, entry); - return entry; } @@ -443,7 +284,7 @@ void *pni_entry_get_context(pni_entry_t static pn_status_t disp2status(pn_disposition_t disp) { - if (!disp) return PN_STATUS_UNKNOWN; + if (!disp) return PN_STATUS_PENDING; switch (disp) { case PN_ACCEPTED: @@ -463,42 +304,119 @@ void pni_entry_updated(pni_entry_t *entr assert(entry); pn_delivery_t *d = entry->delivery; if (d) { - if (pn_delivery_remote_state(d)) + if (pn_delivery_remote_state(d)) { entry->status = disp2status(pn_delivery_remote_state(d)); - else if (pn_delivery_settled(d)) + } else if (pn_delivery_settled(d)) { entry->status = disp2status(pn_delivery_local_state(d)); - else + } else { entry->status = PN_STATUS_PENDING; + } } } -pn_sequence_t pni_entry_tracker(pni_entry_t *entry) +pn_sequence_t pni_entry_id(pni_entry_t *entry) { assert(entry); return entry->id; } -pni_entry_t *pni_store_track(pni_store_t *store, pn_sequence_t id) +pni_entry_t *pni_store_entry(pni_store_t *store, pn_sequence_t id) { assert(store); - return pni_queue_get(&store->queue, id); + return (pni_entry_t *) pn_hash_get(store->tracked, id); +} + +bool pni_store_tracking(pni_store_t *store, pn_sequence_t id) +{ + return (id - store->lwm >= 0) && (store->hwm - id > 0); +} + +pn_sequence_t pni_entry_track(pni_entry_t *entry) +{ + assert(entry); + + pni_store_t *store = entry->stream->store; + entry->id = store->hwm++; + pn_hash_put(store->tracked, entry->id, entry); + + if (store->window >= 0) { + while (store->hwm - store->lwm > store->window) { + pni_entry_t *e = pni_store_entry(store, store->lwm); + if (e) { + pn_hash_del(store->tracked, store->lwm); + } + store->lwm++; + } + } + + return entry->id; } int pni_store_update(pni_store_t *store, pn_sequence_t id, pn_status_t status, int flags, bool settle, bool match) { assert(store); - return pni_queue_update(&store->queue, id, status, flags, settle, match); + + if (!pni_store_tracking(store, id)) { + return 0; + } + + size_t start; + if (PN_CUMULATIVE & flags) { + start = store->lwm; + } else { + start = id; + } + + for (pn_sequence_t i = start; i <= id; i++) { + pni_entry_t *e = pni_store_entry(store, i); + if (e) { + pn_delivery_t *d = e->delivery; + if (d) { + if (!pn_delivery_local_state(d)) { + if (match) { + pn_delivery_update(d, pn_delivery_remote_state(d)); + } else { + switch (status) { + case PN_STATUS_ACCEPTED: + pn_delivery_update(d, PN_ACCEPTED); + break; + case PN_STATUS_REJECTED: + pn_delivery_update(d, PN_REJECTED); + break; + default: + break; + } + } + + pni_entry_updated(e); + } + } + if (settle) { + if (d) { + pn_delivery_settle(d); + } + pn_hash_del(store->tracked, e->id); + } + } + } + + while (store->hwm - store->lwm > 0 && + !pn_hash_get(store->tracked, store->lwm)) { + store->lwm++; + } + + return 0; } int pni_store_get_window(pni_store_t *store) { assert(store); - return store->queue.window; + return store->window; } void pni_store_set_window(pni_store_t *store, int window) { assert(store); - store->queue.window = window; + store->window = window; } Modified: qpid/proton/trunk/proton-c/src/messenger/store.h URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/store.h?rev=1480445&r1=1480444&r2=1480445&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/messenger/store.h (original) +++ qpid/proton/trunk/proton-c/src/messenger/store.h Wed May 8 20:39:43 2013 @@ -43,8 +43,8 @@ void *pni_entry_get_context(pni_entry_t void pni_entry_updated(pni_entry_t *entry); void pni_entry_free(pni_entry_t *entry); -pn_sequence_t pni_entry_tracker(pni_entry_t *entry); -pni_entry_t *pni_store_track(pni_store_t *store, pn_sequence_t id); +pn_sequence_t pni_entry_track(pni_entry_t *entry); +pni_entry_t *pni_store_entry(pni_store_t *store, pn_sequence_t id); int pni_store_update(pni_store_t *store, pn_sequence_t id, pn_status_t status, int flags, bool settle, bool match); int pni_store_get_window(pni_store_t *store); Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1480445&r1=1480444&r2=1480445&view=diff ============================================================================== --- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original) +++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Wed May 8 20:39:43 2013 @@ -265,6 +265,39 @@ class MessengerTest(Test): for t in trackers: assert self.client.status(t) is ACCEPTED, (t, self.client.status(t)) + def testIncomingQueueBiggerThanWindow(self): + self.server.outgoing_window = 10 + self.client.incoming_window = 10 + self.start() + + msg = Message() + msg.address = "amqp://0.0.0.0:12345" + msg.subject = "Hello World!" + + for i in range(20): + self.client.put(msg) + + while self.client.incoming < 20: + self.client.recv(20 - self.client.incoming) + + trackers = [] + while self.client.incoming: + t = self.client.get(msg) + assert self.client.status(t) is PENDING, (t, self.client.status(t)) + trackers.append(t) + + for t in trackers[:10]: + assert self.client.status(t) is None, (t, self.client.status(t)) + for t in trackers[10:]: + assert self.client.status(t) is PENDING, (t, self.client.status(t)) + + self.client.accept() + + for t in trackers[:10]: + assert self.client.status(t) is None, (t, self.client.status(t)) + for t in trackers[10:]: + assert self.client.status(t) is ACCEPTED, (t, self.client.status(t)) + def test_proton222(self): self.start() msg = Message() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org