qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [1/2] qpid-proton git commit: NO-JIRA: add a simple broker against which intermediated examples can be run
Date Wed, 11 Mar 2015 21:42:58 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/master a3b8bb180 -> 4653cdc6f


NO-JIRA: add a simple broker against which intermediated examples can be run


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/8235ba1f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/8235ba1f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/8235ba1f

Branch: refs/heads/master
Commit: 8235ba1f1da41e67c284b866777b28118e4691d8
Parents: a3b8bb1
Author: Gordon Sim <gsim@redhat.com>
Authored: Mon Mar 9 14:24:52 2015 +0000
Committer: Gordon Sim <gsim@redhat.com>
Committed: Wed Mar 11 21:13:35 2015 +0000

----------------------------------------------------------------------
 examples/python/README                      |  10 +-
 examples/python/broker.py                   | 126 +++++++++++++++++++++++
 proton-c/bindings/python/proton/handlers.py |   2 +-
 3 files changed, 130 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8235ba1f/examples/python/README
----------------------------------------------------------------------
diff --git a/examples/python/README b/examples/python/README
index a361fc4..8c60fd8 100644
--- a/examples/python/README
+++ b/examples/python/README
@@ -1,7 +1,9 @@
 Most (though not all) of the current examples require a broker or
 similar intermediary that supports the AMQP 1.0 protocol, allows
 anonymous connections and accepts links to and from a node named
-'examples'.
+'examples'. A very simple broker emulating script - broker.py - is
+provided against which the examples can also be run (transactions are
+not yet supported in this script).
 
 ------------------------------------------------------------------
 
@@ -85,12 +87,6 @@ tx_send.py
 A sender that sends messages in atomic batches using local
 transactions (this example does not persist the messages in anyway).
 
-tx_send_sync.py
-
-A variant of the former example that waits for all messages in a batch
-to be acknowledged before committing. Used only to work around an
-ordering issue in preoton that affected qpidd.
-
 tx_recv.py
 
 A receiver example that accepts batches of messages using local

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8235ba1f/examples/python/broker.py
----------------------------------------------------------------------
diff --git a/examples/python/broker.py b/examples/python/broker.py
new file mode 100755
index 0000000..13eb97c
--- /dev/null
+++ b/examples/python/broker.py
@@ -0,0 +1,126 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import collections, optparse
+from proton import Endpoint, generate_uuid
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+
+class Queue(object):
+    def __init__(self, dynamic=False):
+        self.dynamic = dynamic
+        self.queue = collections.deque()
+        self.consumers = []
+
+    def subscribe(self, consumer):
+        self.consumers.append(consumer)
+
+    def unsubscribe(self, consumer):
+        if consumer in self.consumers:
+            self.consumers.remove(consumer)
+        return len(self.consumers) == 0 and (self.dynamic or self.queue.count == 0)
+
+    def publish(self, message):
+        self.queue.append(message)
+        self.dispatch()
+
+    def dispatch(self, consumer=None):
+        if consumer:
+            c = [consumer]
+        else:
+            c = self.consumers
+        while self._deliver_to(c): pass
+
+    def _deliver_to(self, consumers):
+        try:
+            result = False
+            for c in consumers:
+                if c.credit:
+                    c.send(self.queue.popleft())
+                    result = True
+            return result
+        except IndexError: # no more messages
+            return False
+
+class Broker(MessagingHandler):
+    def __init__(self, url):
+        super(Broker, self).__init__()
+        self.url = url
+        self.queues = {}
+
+    def on_start(self, event):
+        self.acceptor = event.container.listen(self.url)
+
+    def _queue(self, address):
+        if address not in self.queues:
+            self.queues[address] = Queue()
+        return self.queues[address]
+
+    def on_link_opening(self, event):
+        if event.link.is_sender:
+            if event.link.remote_source.dynamic:
+                address = str(generate_uuid())
+                event.link.source.address = address
+                q = Queue(True)
+                self.queues[address] = q
+                q.subscribe(event.link)
+            elif event.link.remote_source.address:
+                event.link.source.address = event.link.remote_source.address
+                self._queue(event.link.source.address).subscribe(event.link)
+        elif event.link.remote_target.address:
+            event.link.target.address = event.link.remote_target.address
+
+    def _unsubscribe(self, link):
+        if link.source.address in self.queues and self.queues[link.source.address].unsubscribe(link):
+            del self.queues[link.source.address]
+
+    def on_link_closing(self, event):
+        if event.link.is_sender:
+            self._unsubscribe(event.link)
+
+    def on_connection_closing(self, event):
+        print "on_connection_closing"
+        self.remove_stale_consumers(event.connection)
+
+    def on_disconnected(self, event):
+        print "on_disconnected"
+        self.remove_stale_consumers(event.connection)
+
+    def remove_stale_consumers(self, connection):
+        l = connection.link_head(Endpoint.REMOTE_ACTIVE)
+        while l:
+            if l.is_sender:
+                self._unsubscribe(l)
+            l = l.next(Endpoint.REMOTE_ACTIVE)
+
+    def on_sendable(self, event):
+        self._queue(event.link.source.address).dispatch(event.link)
+
+    def on_message(self, event):
+        self._queue(event.link.target.address).publish(event.message)
+
+parser = optparse.OptionParser(usage="usage: %prog [options]")
+parser.add_option("-a", "--address", default="localhost:5672",
+                  help="address router listens on (default %default)")
+opts, args = parser.parse_args()
+
+try:
+    Container(Broker(opts.address)).run()
+except KeyboardInterrupt: pass

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8235ba1f/proton-c/bindings/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py
index 53dda92..5411a1d 100644
--- a/proton-c/bindings/python/proton/handlers.py
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -370,7 +370,7 @@ class EndpointStateHandler(Handler):
         self.on_transport_closed(event)
 
     def on_transport_closed(self, event):
-        if self.delegate:
+        if self.delegate and self.is_local_open(event.connection):
             dispatch(self.delegate, 'on_disconnected', event)
 
 class MessagingHandler(Handler, Acking):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message