qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject qpid-proton git commit: Remove MessagingContext, rename EventLoop as Container, move link creation to container, allow single step link creation if desired.
Date Thu, 04 Dec 2014 12:03:13 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/examples 1c4d9ed89 -> 9c87b3db5


Remove MessagingContext, rename EventLoop as Container, move link creation to container, allow single step link creation if desired.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9c87b3db
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9c87b3db
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9c87b3db

Branch: refs/heads/examples
Commit: 9c87b3db5f27f9aeaa631e9e856fb6908d66f229
Parents: 1c4d9ed
Author: Gordon Sim <gsim@redhat.com>
Authored: Wed Dec 3 12:06:07 2014 +0000
Committer: Gordon Sim <gsim@redhat.com>
Committed: Thu Dec 4 11:55:10 2014 +0000

----------------------------------------------------------------------
 examples/engine/py/client.py                    |  10 +-
 examples/engine/py/client_http.py               |   6 +-
 examples/engine/py/db_recv.py                   |  14 +-
 examples/engine/py/db_send.py                   |  18 +-
 examples/engine/py/helloworld.py                |  10 +-
 examples/engine/py/helloworld_direct.py         |  14 +-
 examples/engine/py/helloworld_direct_tornado.py |   8 +-
 examples/engine/py/helloworld_tornado.py        |   8 +-
 examples/engine/py/proton_server.py             |  18 +-
 examples/engine/py/proton_tornado.py            |   4 +-
 examples/engine/py/recurring_timer.py           |  22 +-
 examples/engine/py/recurring_timer_tornado.py   |  22 +-
 examples/engine/py/selected_recv.py             |   8 +-
 examples/engine/py/server.py                    |  13 +-
 examples/engine/py/server_tx.py                 |  18 +-
 examples/engine/py/simple_recv.py               |  12 +-
 examples/engine/py/simple_send.py               |  12 +-
 examples/engine/py/tx_recv.py                   |  19 +-
 examples/engine/py/tx_recv_interactive.py       |  16 +-
 examples/engine/py/tx_send.py                   |  19 +-
 examples/engine/py/tx_send_sync.py              |  21 +-
 proton-c/bindings/python/proton/reactors.py     | 216 +++++++++----------
 proton-c/bindings/python/proton/utils.py        |  30 ++-
 23 files changed, 254 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/client.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/client.py b/examples/engine/py/client.py
index a649dec..d1e2706 100755
--- a/examples/engine/py/client.py
+++ b/examples/engine/py/client.py
@@ -20,7 +20,7 @@
 
 from proton import Message
 from proton.handlers import MessagingHandler
-from proton.reactors import EventLoop
+from proton.reactors import Container
 
 class Client(MessagingHandler):
     def __init__(self, host, address, requests):
@@ -30,9 +30,9 @@ class Client(MessagingHandler):
         self.requests = requests
 
     def on_start(self, event):
-        self.conn = event.reactor.connect(self.host)
-        self.sender = self.conn.create_sender(self.address)
-        self.receiver = self.conn.create_receiver(None, dynamic=True)
+        self.conn = event.container.connect(self.host)
+        self.sender = event.container.create_sender(self.conn, self.address)
+        self.receiver = event.container.create_receiver(self.conn, None, dynamic=True)
 
     def next_request(self):
         if self.receiver.remote_source.address:
@@ -55,5 +55,5 @@ REQUESTS= ["Twas brillig, and the slithy toves",
            "All mimsy were the borogroves,",
            "And the mome raths outgrabe."]
 
-EventLoop(Client("localhost:5672", "examples", REQUESTS)).run()
+Container(Client("localhost:5672", "examples", REQUESTS)).run()
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/client_http.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/client_http.py b/examples/engine/py/client_http.py
index ab7b1cd..5202f8d 100755
--- a/examples/engine/py/client_http.py
+++ b/examples/engine/py/client_http.py
@@ -36,9 +36,9 @@ class Client(MessagingHandler):
         self.receiver = None
 
     def on_start(self, event):
-        context = event.reactor.connect(self.host)
-        self.sender = context.create_sender(self.address)
-        self.receiver = context.create_receiver(None, dynamic=True)
+        conn = event.container.connect(self.host)
+        self.sender = event.container.create_sender(conn, self.address)
+        self.receiver = event.container.create_receiver(conn, None, dynamic=True)
 
     def on_link_opened(self, event):
         if event.receiver == self.receiver:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/db_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_recv.py b/examples/engine/py/db_recv.py
index 5779403..8b4490d 100755
--- a/examples/engine/py/db_recv.py
+++ b/examples/engine/py/db_recv.py
@@ -19,22 +19,20 @@
 #
 
 from proton.handlers import MessagingHandler
-from proton.reactors import ApplicationEvent, EventLoop
+from proton.reactors import ApplicationEvent, Container
 from db_common import Db
 
 class Recv(MessagingHandler):
-    def __init__(self, host, address):
+    def __init__(self, url):
         super(Recv, self).__init__(auto_accept=False)
-        self.host = host
-        self.address = address
+        self.url = url
         self.delay = 0
         # TODO: load last tag from db
         self.last_id = None
 
     def on_start(self, event):
-        self.db = Db("dst_db", event.reactor.get_event_trigger())
-        context = event.reactor.connect(self.host)
-        context.create_receiver(self.address)
+        self.db = Db("dst_db", event.container.get_event_trigger())
+        event.container.create_receiver(self.url)
 
     def on_record_inserted(self, event):
         self.accept(event.delivery)
@@ -49,7 +47,7 @@ class Recv(MessagingHandler):
             self.accept(event.delivery)
 
 try:
-    EventLoop(Recv("localhost:5672", "examples")).run()
+    Container(Recv("localhost:5672/examples")).run()
 except KeyboardInterrupt: pass
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/db_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/db_send.py b/examples/engine/py/db_send.py
index b3a26fd..bc8d6da 100755
--- a/examples/engine/py/db_send.py
+++ b/examples/engine/py/db_send.py
@@ -22,29 +22,27 @@ import Queue
 import time
 from proton import Message
 from proton.handlers import MessagingHandler
-from proton.reactors import ApplicationEvent, EventLoop
+from proton.reactors import ApplicationEvent, Container
 from db_common import Db
 
 class Send(MessagingHandler):
-    def __init__(self, host, address):
+    def __init__(self, url):
         super(Send, self).__init__()
-        self.host = host
-        self.address = address
+        self.url = url
         self.delay = 0
         self.sent = 0
         self.records = Queue.Queue(maxsize=50)
 
     def on_start(self, event):
-        self.eventloop = event.reactor
-        self.db = Db("src_db", event.reactor.get_event_trigger())
-        context = event.reactor.connect(self.host)
-        self.sender = context.create_sender(self.address)
+        self.container = event.container
+        self.db = Db("src_db", self.container.get_event_trigger())
+        self.sender = self.container.create_sender(self.url)
 
     def on_records_loaded(self, event):
         if self.records.empty() and event.subject == self.sent:
             print "Exhausted available data, waiting to recheck..."
             # check for new data after 5 seconds
-            self.eventloop.schedule(time.time() + 5, link=self.sender, subject="data")
+            self.container.schedule(time.time() + 5, link=self.sender, subject="data")
         else:
             self.send()
 
@@ -78,6 +76,6 @@ class Send(MessagingHandler):
             self.request_records()
 
 try:
-    EventLoop(Send("localhost:5672", "examples")).run()
+    Container(Send("localhost:5672/examples")).run()
 except KeyboardInterrupt: pass
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/helloworld.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld.py b/examples/engine/py/helloworld.py
index 5aa1482..92d6083 100755
--- a/examples/engine/py/helloworld.py
+++ b/examples/engine/py/helloworld.py
@@ -20,7 +20,7 @@
 
 from proton import Message
 from proton.handlers import MessagingHandler
-from proton.reactors import EventLoop
+from proton.reactors import Container
 
 class HelloWorld(MessagingHandler):
     def __init__(self, server, address):
@@ -29,9 +29,9 @@ class HelloWorld(MessagingHandler):
         self.address = address
 
     def on_start(self, event):
-        ctxt = event.reactor.connect(self.server)
-        ctxt.create_receiver(self.address)
-        ctxt.create_sender(self.address)
+        conn = event.container.connect(self.server)
+        event.container.create_receiver(conn, self.address)
+        event.container.create_sender(conn, self.address)
 
     def on_credit(self, event):
         event.sender.send_msg(Message(body=u"Hello World!"))
@@ -41,5 +41,5 @@ class HelloWorld(MessagingHandler):
         print event.message.body
         event.connection.close()
 
-EventLoop(HelloWorld("localhost:5672", "examples")).run()
+Container(HelloWorld("localhost:5672", "examples")).run()
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/helloworld_direct.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_direct.py b/examples/engine/py/helloworld_direct.py
index 35ac597..c961fe5 100755
--- a/examples/engine/py/helloworld_direct.py
+++ b/examples/engine/py/helloworld_direct.py
@@ -20,18 +20,16 @@
 
 from proton import Message
 from proton.handlers import MessagingHandler
-from proton.reactors import EventLoop
+from proton.reactors import Container
 
 class HelloWorld(MessagingHandler):
-    def __init__(self, server, address):
+    def __init__(self, url):
         super(HelloWorld, self).__init__()
-        self.server = server
-        self.address = address
+        self.url = url
 
     def on_start(self, event):
-        self.acceptor = event.reactor.listen(self.server)
-        ctxt = event.reactor.connect(self.server)
-        ctxt.create_sender(self.address)
+        self.acceptor = event.container.listen(self.url)
+        event.container.create_sender(self.url)
 
     def on_credit(self, event):
         event.sender.send_msg(Message(body=u"Hello World!"))
@@ -46,4 +44,4 @@ class HelloWorld(MessagingHandler):
     def on_connection_closed(self, event):
         self.acceptor.close()
 
-EventLoop(HelloWorld("localhost:8888", "examples")).run()
+Container(HelloWorld("localhost:8888/examples")).run()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/helloworld_direct_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_direct_tornado.py b/examples/engine/py/helloworld_direct_tornado.py
index 45926c6..8873357 100755
--- a/examples/engine/py/helloworld_direct_tornado.py
+++ b/examples/engine/py/helloworld_direct_tornado.py
@@ -29,10 +29,10 @@ class HelloWorld(MessagingHandler):
         self.address = address
 
     def on_start(self, event):
-        self.eventloop = event.reactor
-        self.acceptor = event.reactor.listen(self.server)
-        ctxt = event.reactor.connect(self.server)
-        ctxt.create_sender(self.address)
+        self.eventloop = event.container
+        self.acceptor = event.container.listen(self.server)
+        conn = event.container.connect(self.server)
+        event.container.create_sender(conn, self.address)
 
     def on_credit(self, event):
         event.sender.send_msg(Message(body=u"Hello World!"))

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/helloworld_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/helloworld_tornado.py b/examples/engine/py/helloworld_tornado.py
index 6a82b69..f7d4c26 100755
--- a/examples/engine/py/helloworld_tornado.py
+++ b/examples/engine/py/helloworld_tornado.py
@@ -29,10 +29,10 @@ class HelloWorld(MessagingHandler):
         self.address = address
 
     def on_start(self, event):
-        self.eventloop = event.reactor
-        ctxt = event.reactor.connect(self.server)
-        ctxt.create_receiver(self.address)
-        ctxt.create_sender(self.address)
+        self.eventloop = event.container
+        conn = event.container.connect(self.server)
+        event.container.create_receiver(conn, self.address)
+        event.container.create_sender(conn, self.address)
 
     def on_credit(self, event):
         event.sender.send_msg(Message(body=u"Hello World!"))

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/proton_server.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/proton_server.py b/examples/engine/py/proton_server.py
index b2e2027..8a5077b 100644
--- a/examples/engine/py/proton_server.py
+++ b/examples/engine/py/proton_server.py
@@ -18,15 +18,15 @@
 #
 
 from proton import Message
-from proton.reactors import EventLoop
-from proton.handlers import FlowController, IncomingMessageHandler
+from proton.reactors import Container
+from proton.handlers import MessagingHandler
 
-class Server(IncomingMessageHandler):
+class Server(MessagingHandler):
     def __init__(self, host, address):
         super(Server, self).__init__()
-        self.eventloop = EventLoop(self, FlowController(10))
-        self.conn = self.eventloop.connect(host)
-        self.receiver = self.conn.create_receiver(address)
+        self.container = Container(self)
+        self.conn = self.container.connect(host)
+        self.receiver = self.container.create_receiver(self.conn, address)
         self.senders = {}
         self.relay = None
 
@@ -35,21 +35,21 @@ class Server(IncomingMessageHandler):
 
     def on_connection_open(self, event):
         if event.connection.remote_offered_capabilities and "ANONYMOUS-RELAY" in event.connection.remote_offered_capabilities:
-            self.relay = self.conn.create_sender(None)
+            self.relay = self.container.create_sender(self.conn, None)
 
     def on_connection_close(self, endpoint, error):
         if error: print "Closed due to %s" % error
         self.conn.close()
 
     def run(self):
-        self.eventloop.run()
+        self.container.run()
 
     def send(self, response, reply_to):
         sender = self.relay
         if not sender:
             sender = self.senders.get(reply_to)
         if not sender:
-            sender = self.conn.create_sender(reply_to)
+            sender = self.container.create_sender(self.conn, reply_to)
             self.senders[reply_to] = sender
         msg = Message(body=response)
         if self.relay:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/proton_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/proton_tornado.py b/examples/engine/py/proton_tornado.py
index e49b28e..cfe7d6f 100644
--- a/examples/engine/py/proton_tornado.py
+++ b/examples/engine/py/proton_tornado.py
@@ -18,10 +18,10 @@
 # under the License.
 #
 
-from proton.reactors import ApplicationEvent, EventLoop, StartEvent
+from proton.reactors import ApplicationEvent, Container, StartEvent
 import tornado.ioloop
 
-class TornadoLoop(EventLoop):
+class TornadoLoop(Container):
     def __init__(self, *handlers):
         super(TornadoLoop, self).__init__(*handlers)
         self.loop = tornado.ioloop.IOLoop.current()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/recurring_timer.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/recurring_timer.py b/examples/engine/py/recurring_timer.py
index c641ec6..de530d3 100755
--- a/examples/engine/py/recurring_timer.py
+++ b/examples/engine/py/recurring_timer.py
@@ -19,29 +19,25 @@
 #
 
 import time
-from proton.reactors import EventLoop, Handler
+from proton.reactors import Container, Handler
 
 class Recurring(Handler):
     def __init__(self, period):
-        self.eventloop = EventLoop(self)
         self.period = period
-        self.eventloop.schedule(time.time() + self.period, subject=self)
+
+    def on_start(self, event):
+        self.container = event.container
+        self.container.schedule(time.time() + self.period, subject=self)
 
     def on_timer(self, event):
         print "Tick..."
-        self.eventloop.schedule(time.time() + self.period, subject=self)
-
-    def run(self):
-        self.eventloop.run()
-
-    def stop(self):
-        self.eventloop.stop()
+        self.container.schedule(time.time() + self.period, subject=self)
 
 try:
-    app = Recurring(1.0)
-    app.run()
+    container = Container(Recurring(1.0))
+    container.run()
 except KeyboardInterrupt:
-    app.stop()
+    container.stop()
     print
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/recurring_timer_tornado.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/recurring_timer_tornado.py b/examples/engine/py/recurring_timer_tornado.py
index f4ca260..aeeb20c 100755
--- a/examples/engine/py/recurring_timer_tornado.py
+++ b/examples/engine/py/recurring_timer_tornado.py
@@ -19,30 +19,26 @@
 #
 
 import time
-from proton import Handler
+from proton.reactors import Handler
 from proton_tornado import TornadoLoop
 
 class Recurring(Handler):
     def __init__(self, period):
-        self.eventloop = TornadoLoop(self)
         self.period = period
-        self.eventloop.schedule(time.time() + self.period, subject=self)
+
+    def on_start(self, event):
+        self.container = event.container
+        self.container.schedule(time.time() + self.period, subject=self)
 
     def on_timer(self, event):
         print "Tick..."
-        self.eventloop.schedule(time.time() + self.period, subject=self)
-
-    def run(self):
-        self.eventloop.run()
-
-    def stop(self):
-        self.eventloop.stop()
+        self.container.schedule(time.time() + self.period, subject=self)
 
 try:
-    app = Recurring(1.0)
-    app.run()
+    container = TornadoLoop(Recurring(1.0))
+    container.run()
 except KeyboardInterrupt:
-    app.stop()
+    container.stop()
     print
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/selected_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/selected_recv.py b/examples/engine/py/selected_recv.py
index 8425f3d..d0df3b5 100755
--- a/examples/engine/py/selected_recv.py
+++ b/examples/engine/py/selected_recv.py
@@ -18,7 +18,7 @@
 # under the License.
 #
 
-from proton.reactors import EventLoop, Selector
+from proton.reactors import Container, Selector
 from proton.handlers import MessagingHandler
 
 class Recv(MessagingHandler):
@@ -26,14 +26,14 @@ class Recv(MessagingHandler):
         super(Recv, self).__init__()
 
     def on_start(self, event):
-        conn = event.reactor.connect("localhost:5672")
-        conn.create_receiver("examples", options=Selector(u"colour = 'green'"))
+        conn = event.container.connect("localhost:5672")
+        event.container.create_receiver(conn, "examples", options=Selector(u"colour = 'green'"))
 
     def on_message(self, event):
         print event.message.body
 
 try:
-    EventLoop(Recv()).run()
+    Container(Recv()).run()
 except KeyboardInterrupt: pass
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/server.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/server.py b/examples/engine/py/server.py
index 6ab5671..3e6aad4 100755
--- a/examples/engine/py/server.py
+++ b/examples/engine/py/server.py
@@ -20,7 +20,7 @@
 
 from proton import Message
 from proton.handlers import MessagingHandler
-from proton.reactors import EventLoop
+from proton.reactors import Container
 
 class Server(MessagingHandler):
     def __init__(self, host, address):
@@ -29,26 +29,27 @@ class Server(MessagingHandler):
         self.address = address
 
     def on_start(self, event):
-        self.conn = event.reactor.connect(self.host)
-        self.receiver = self.conn.create_receiver(self.address)
+        self.container = event.container
+        self.conn = event.container.connect(self.host)
+        self.receiver = event.container.create_receiver(self.conn, self.address)
         self.senders = {}
         self.relay = None
 
     def on_connection_opened(self, event):
         if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities:
-            self.relay = self.conn.create_sender(None)
+            self.relay = self.container.create_sender(self.conn, None)
 
     def on_message(self, event):
         sender = self.relay
         if not sender:
             sender = self.senders.get(event.message.reply_to)
         if not sender:
-            sender = self.conn.create_sender(event.message.reply_to)
+            sender = self.container.create_sender(self.conn, event.message.reply_to)
             self.senders[event.message.reply_to] = sender
         sender.send_msg(Message(address=event.message.reply_to, body=event.message.body.upper()))
 
 try:
-    EventLoop(Server("localhost:5672", "examples")).run()
+    Container(Server("localhost:5672", "examples")).run()
 except KeyboardInterrupt: pass
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/server_tx.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/server_tx.py b/examples/engine/py/server_tx.py
index cda2d0b..0305a3f 100755
--- a/examples/engine/py/server_tx.py
+++ b/examples/engine/py/server_tx.py
@@ -19,16 +19,15 @@
 #
 
 from proton import Message
-from proton.reactors import EventLoop
+from proton.reactors import Container
 from proton.handlers import MessagingHandler, TransactionHandler
 
 class TxRequest(TransactionHandler):
-    def __init__(self, response, sender, request_delivery, context):
+    def __init__(self, response, sender, request_delivery):
         super(TxRequest, self).__init__()
         self.response = response
         self.sender = sender
         self.request_delivery = request_delivery
-        self.context = context
 
     def on_transaction_declared(self, event):
         self.sender.send_msg(self.response, transaction=event.transaction)
@@ -49,8 +48,9 @@ class TxServer(MessagingHandler):
         self.address = address
 
     def on_start(self, event):
-        self.context = event.reactor.connect(self.host, reconnect=False)
-        self.receiver = self.context.create_receiver(self.address)
+        self.container = event.container
+        self.conn = event.container.connect(self.host, reconnect=False)
+        self.receiver = event.container.create_receiver(self.conn, self.address)
         self.senders = {}
         self.relay = None
 
@@ -59,18 +59,18 @@ class TxServer(MessagingHandler):
         if not sender:
             sender = self.senders.get(event.message.reply_to)
         if not sender:
-            sender = self.context.create_sender(event.message.reply_to)
+            sender = self.container.create_sender(self.conn, event.message.reply_to)
             self.senders[event.message.reply_to] = sender
 
         response = Message(address=event.message.reply_to, body=event.message.body.upper())
-        self.context.declare_transaction(handler=TxRequest(response, sender, event.delivery, self.context))
+        self.container.declare_transaction(self.conn, handler=TxRequest(response, sender, event.delivery))
 
     def on_connection_open(self, event):
         if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities:
-            self.relay = self.context.create_sender(None)
+            self.relay = self.container.create_sender(self.conn, None)
 
 try:
-    EventLoop(TxServer("localhost:5672", "examples")).run()
+    Container(TxServer("localhost:5672", "examples")).run()
 except KeyboardInterrupt: pass
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/simple_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/simple_recv.py b/examples/engine/py/simple_recv.py
index ea80aa6..6825c86 100755
--- a/examples/engine/py/simple_recv.py
+++ b/examples/engine/py/simple_recv.py
@@ -19,23 +19,21 @@
 #
 
 from proton.handlers import MessagingHandler
-from proton.reactors import EventLoop
+from proton.reactors import Container
 
 class Recv(MessagingHandler):
-    def __init__(self, host, address):
+    def __init__(self, url):
         super(Recv, self).__init__()
-        self.host = host
-        self.address = address
+        self.url = url
 
     def on_start(self, event):
-        conn = event.reactor.connect(self.host)
-        conn.create_receiver(self.address)
+        event.container.create_receiver(self.url)
 
     def on_message(self, event):
         print event.message.body
 
 try:
-    EventLoop(Recv("localhost:5672", "examples")).run()
+    Container(Recv("localhost:5672/examples")).run()
 except KeyboardInterrupt: pass
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/simple_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/simple_send.py b/examples/engine/py/simple_send.py
index bbd30ac..21530ef 100755
--- a/examples/engine/py/simple_send.py
+++ b/examples/engine/py/simple_send.py
@@ -20,20 +20,18 @@
 
 from proton import Message
 from proton.handlers import MessagingHandler
-from proton.reactors import EventLoop
+from proton.reactors import Container
 
 class Send(MessagingHandler):
-    def __init__(self, host, address, messages):
+    def __init__(self, url, messages):
         super(Send, self).__init__()
-        self.host = host
-        self.address = address
+        self.url = url
         self.sent = 0
         self.confirmed = 0
         self.total = messages
 
     def on_start(self, event):
-        conn = event.reactor.connect(self.host)
-        conn.create_sender(self.address)
+        event.container.create_sender(self.url)
 
     def on_credit(self, event):
         while event.sender.credit and self.sent < self.total:
@@ -51,5 +49,5 @@ class Send(MessagingHandler):
         self.sent = self.confirmed
 
 try:
-    EventLoop(Send("localhost:5672", "examples", 10000)).run()
+    Container(Send("localhost:5672/examples", 10000)).run()
 except KeyboardInterrupt: pass

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/tx_recv.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_recv.py b/examples/engine/py/tx_recv.py
index a28a3df..fc4bb8a 100755
--- a/examples/engine/py/tx_recv.py
+++ b/examples/engine/py/tx_recv.py
@@ -18,7 +18,7 @@
 # under the License.
 #
 
-from proton.reactors import EventLoop
+from proton.reactors import Container
 from proton.handlers import TransactionalClientHandler
 
 class TxRecv(TransactionalClientHandler):
@@ -26,10 +26,12 @@ class TxRecv(TransactionalClientHandler):
         super(TxRecv, self).__init__(prefetch=0)
         self.current_batch = 0
         self.batch_size = batch_size
-        self.event_loop = EventLoop(self)
-        self.conn = self.event_loop.connect("localhost:5672")
-        self.receiver = self.conn.create_receiver("examples")
-        self.conn.declare_transaction(handler=self)
+
+    def on_start(self, event):
+        self.container = event.container
+        self.conn = self.container.connect("localhost:5672")
+        self.receiver = self.container.create_receiver(self.conn, "examples")
+        self.container.declare_transaction(self.conn, handler=self)
         self.transaction = None
 
     def on_message(self, event):
@@ -46,16 +48,13 @@ class TxRecv(TransactionalClientHandler):
 
     def on_transaction_committed(self, event):
         self.current_batch = 0
-        self.conn.declare_transaction(handler=self)
+        self.container.declare_transaction(self.conn, handler=self)
 
     def on_disconnected(self, event):
         self.current_batch = 0
 
-    def run(self):
-        self.event_loop.run()
-
 try:
-    TxRecv(10).run()
+    Container(TxRecv(10)).run()
 except KeyboardInterrupt: pass
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/tx_recv_interactive.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_recv_interactive.py b/examples/engine/py/tx_recv_interactive.py
index a822992..6eb320e 100755
--- a/examples/engine/py/tx_recv_interactive.py
+++ b/examples/engine/py/tx_recv_interactive.py
@@ -20,7 +20,7 @@
 
 import sys
 import threading
-from proton.reactors import ApplicationEvent, EventLoop
+from proton.reactors import ApplicationEvent, Container
 from proton.handlers import TransactionalClientHandler
 
 class TxRecv(TransactionalClientHandler):
@@ -28,10 +28,10 @@ class TxRecv(TransactionalClientHandler):
         super(TxRecv, self).__init__(prefetch=0)
 
     def on_start(self, event):
-        self.context = event.reactor.connect("localhost:5672")
-        self.receiver = self.context.create_receiver("examples")
-        #self.context.declare_transaction(handler=self, settle_before_discharge=False)
-        self.context.declare_transaction(handler=self, settle_before_discharge=True)
+        self.container = event.container
+        self.conn = self.container.connect("localhost:5672")
+        self.receiver = self.container.create_receiver(self.conn, "examples")
+        self.container.declare_transaction(self.conn, handler=self, settle_before_discharge=True)
         self.transaction = None
 
     def on_message(self, event):
@@ -44,11 +44,11 @@ class TxRecv(TransactionalClientHandler):
 
     def on_transaction_committed(self, event):
         print "transaction committed"
-        self.context.declare_transaction(handler=self)
+        self.container.declare_transaction(self.conn, handler=self)
 
     def on_transaction_aborted(self, event):
         print "transaction aborted"
-        self.context.declare_transaction(handler=self)
+        self.container.declare_transaction(self.conn, handler=self)
 
     def on_commit(self, event):
         self.transaction.commit()
@@ -65,7 +65,7 @@ class TxRecv(TransactionalClientHandler):
         c.close()
 
 try:
-    reactor = EventLoop(TxRecv())
+    reactor = Container(TxRecv())
     events = reactor.get_event_trigger()
     thread = threading.Thread(target=reactor.run)
     thread.daemon=True

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/tx_send.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_send.py b/examples/engine/py/tx_send.py
index b2f12b2..5b11280 100755
--- a/examples/engine/py/tx_send.py
+++ b/examples/engine/py/tx_send.py
@@ -19,7 +19,7 @@
 #
 
 from proton import Message
-from proton.reactors import EventLoop
+from proton.reactors import Container
 from proton.handlers import TransactionalClientHandler
 
 class TxSend(TransactionalClientHandler):
@@ -30,10 +30,12 @@ class TxSend(TransactionalClientHandler):
         self.confirmed = 0
         self.total = messages
         self.batch_size = batch_size
-        self.eventloop = EventLoop()
-        self.conn = self.eventloop.connect("localhost:5672", handler=self)
-        self.sender = self.conn.create_sender("examples")
-        self.conn.declare_transaction(handler=self)
+
+    def on_start(self, event):
+        self.container = event.container
+        self.conn = self.container.connect("localhost:5672", handler=self)
+        self.sender = self.container.create_sender(self.conn, "examples")
+        self.container.declare_transaction(self.conn, handler=self)
         self.transaction = None
 
     def on_transaction_declared(self, event):
@@ -63,14 +65,11 @@ class TxSend(TransactionalClientHandler):
             event.connection.close()
         else:
             self.current_batch = 0
-            self.conn.declare_transaction(handler=self)
+            self.container.declare_transaction(self.conn, handler=self)
 
     def on_disconnected(self, event):
         self.current_batch = 0
 
-    def run(self):
-        self.eventloop.run()
-
 try:
-    TxSend(10000, 10).run()
+    Container(TxSend(10000, 10)).run()
 except KeyboardInterrupt: pass

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/tx_send_sync.py
----------------------------------------------------------------------
diff --git a/examples/engine/py/tx_send_sync.py b/examples/engine/py/tx_send_sync.py
index 0c50838..c051408 100755
--- a/examples/engine/py/tx_send_sync.py
+++ b/examples/engine/py/tx_send_sync.py
@@ -19,21 +19,23 @@
 #
 
 from proton import Message
-from proton.reactors import EventLoop
+from proton.reactors import Container
 from proton.handlers import TransactionalClientHandler
 
 class TxSend(TransactionalClientHandler):
     def __init__(self, messages, batch_size):
         super(TxSend, self).__init__()
         self.current_batch = 0
-        self.confirmed = 0
         self.committed = 0
+        self.confirmed = 0
         self.total = messages
         self.batch_size = batch_size
-        self.eventloop = EventLoop()
-        self.conn = self.eventloop.connect("localhost:5672", handler=self)
-        self.sender = self.conn.create_sender("examples")
-        self.conn.declare_transaction(handler=self)
+
+    def on_start(self, event):
+        self.container = event.container
+        self.conn = self.container.connect("localhost:5672", handler=self)
+        self.sender = self.container.create_sender(self.conn, "examples")
+        self.container.declare_transaction(self.conn, handler=self)
         self.transaction = None
 
     def on_transaction_declared(self, event):
@@ -64,14 +66,11 @@ class TxSend(TransactionalClientHandler):
             event.connection.close()
         else:
             self.current_batch = 0
-            self.conn.declare_transaction(handler=self)
+            self.container.declare_transaction(self.conn, handler=self)
 
     def on_disconnected(self, event):
         self.current_batch = 0
 
-    def run(self):
-        self.eventloop.run()
-
 try:
-    TxSend(10000, 10).run()
+    Container(TxSend(10000, 10)).run()
 except KeyboardInterrupt: pass

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index fc10860..16a87e4 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -18,7 +18,7 @@
 #
 import heapq, os, Queue, socket, time, types
 from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
-from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout
+from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Session, Terminus, Timeout
 from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
 from select import select
 from proton.handlers import nested_handlers, ScopedHandler
@@ -310,9 +310,9 @@ class ApplicationEvent(Event):
                            ", ".join([str(o) for o in objects if o is not None]))
 
 class StartEvent(ApplicationEvent):
-    def __init__(self, reactor):
+    def __init__(self, container):
         super(StartEvent, self).__init__("start")
-        self.reactor = reactor
+        self.container = container
 
 class ScheduledEvents(Events):
     """
@@ -576,100 +576,38 @@ def _apply_link_options(options, link):
         else:
             if options.test(link): options.apply(link)
 
+def _create_session(connection, handler=None):
+    session = connection.session()
+    session.open()
+    return session
 
-class MessagingContext(object):
-    """
-    A context for creating links. This allows the user to ignore
-    sessions unless they explicitly want to control them. Additionally
-    provides support for transactional messaging.
-    """
-    def __init__(self, conn, handler=None, ssn=None):
-        self.conn = conn
-        if handler:
-            self.conn.context = handler
-        self.conn._mc = self
-        self.ssn = ssn
-        self.txn_ctrl = None
-
-    def _get_handler(self):
-        return self.conn.context
 
-    def _set_handler(self, value):
-        self.conn.context = value
-
-    handler = property(_get_handler, _set_handler)
-
-    def create_sender(self, target, source=None, name=None, handler=None, tags=None, options=None):
-        snd = self._get_ssn().sender(name or self._get_id(target, source))
-        if source:
-            snd.source.address = source
-        if target:
-            snd.target.address = target
-        if handler:
-            snd.context = handler
-        snd.tags = tags or delivery_tags()
-        snd.send_msg = types.MethodType(_send_msg, snd)
-        _apply_link_options(options, snd)
-        snd.open()
-        return snd
-
-    def create_receiver(self, source, target=None, name=None, dynamic=False, handler=None, options=None):
-        rcv = self._get_ssn().receiver(name or self._get_id(source, target))
-        if source:
-            rcv.source.address = source
-        if dynamic:
-            rcv.source.dynamic = True
-        if target:
-            rcv.target.address = target
-        if handler:
-            rcv.context = handler
-        _apply_link_options(options, rcv)
-        rcv.open()
-        return rcv
-
-    def create_session(self):
-        return MessageContext(conn=None, ssn=self._new_ssn())
+def _get_attr(target, name):
+    if hasattr(target, name):
+        return getattr(target, name)
+    else:
+        return None
 
-    def declare_transaction(self, handler=None, settle_before_discharge=False):
-        if not self.txn_ctrl:
-            self.txn_ctrl = self.create_sender(None, name="txn-ctrl")
-            self.txn_ctrl.target.type = Terminus.COORDINATOR
-            self.txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
-        return Transaction(self.txn_ctrl, handler, settle_before_discharge)
+class SessionPerConnection(object):
+    def __init__(self):
+        self._default_session = None
 
-    def close(self):
-        if self.ssn:
-            self.ssn.close()
-        if self.conn:
-            self.conn.close()
-
-    def _get_id(self, remote, local):
-        if local and remote: "%s-%s-%s" % (self.conn.container, remote, local)
-        elif local: return "%s-%s" % (self.conn.container, local)
-        elif remote: return "%s-%s" % (self.conn.container, remote)
-        else: return "%s-%s" % (self.conn.container, str(generate_uuid()))
-
-    def _get_ssn(self):
-        if not self.ssn:
-            self.ssn = self._new_ssn()
-            self.ssn.context = self
-        return self.ssn
-
-    def _new_ssn(self):
-        ssn = self.conn.session()
-        ssn.open()
-        return ssn
+    def session(self, connection):
+        if not self._default_session:
+            self._default_session = _create_session(connection)
+            self._default_session.context = self
+        return self._default_session
 
     def on_session_remote_close(self, event):
-        if self.conn:
-            self.conn.close()
+        event.connection.close()
+        self._default_session = None
 
 class Connector(Handler):
     """
     Internal handler that triggers the necessary socket connect for an
     opened connection.
     """
-    def attach_to(self, loop):
+    def __init__(self, loop):
         self.loop = loop
 
     def _connect(self, connection):
@@ -677,6 +615,7 @@ class Connector(Handler):
         #print "connecting to %s:%i" % (host, port)
         heartbeat = connection.heartbeat if hasattr(connection, 'heartbeat') else None
         self.loop.add(AmqpSocket(connection, socket.socket(), self.loop.events, heartbeat=heartbeat).connect(host, port))
+        connection._pin = None #connection is now referenced by AmqpSocket, so no need for circular reference
 
     def on_connection_local_open(self, event):
         if hasattr(event.connection, "address"):
@@ -688,6 +627,7 @@ class Connector(Handler):
 
     def on_disconnected(self, event):
         if hasattr(event.connection, "reconnect"):
+            event.connection._pin = event.connection #no longer referenced by AmqpSocket, so pin in memory with circular reference
             delay = event.connection.reconnect.next()
             if delay == 0:
                 print "Disconnected, reconnecting..."
@@ -739,31 +679,97 @@ class Urls(object):
             self.i = iter(self.values)
             return self._as_pair(self.i.next())
 
-class EventLoop(object):
+class Container(object):
     def __init__(self, *handlers):
-        self.connector = Connector()
-        h = [self.connector, ScopedHandler()]
+        h = [Connector(self), ScopedHandler()]
         h.extend(nested_handlers(handlers))
         self.events = ScheduledEvents(*h)
         self.loop = SelectLoop(self.events)
-        self.connector.attach_to(self)
         self.trigger = None
         self.container_id = str(generate_uuid())
 
     def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None):
-        context = MessagingContext(self.events.connection(), handler=handler)
-        context.conn.container = self.container_id or str(generate_uuid())
-        context.conn.heartbeat = heartbeat
-        if url: context.conn.address = Urls([url])
-        elif urls: context.conn.address = Urls(urls)
-        elif address: context.conn.address = address
+        conn = self.events.connection()
+        conn._pin = conn #circular reference until the open event gets handled
+        if handler:
+            conn.context = handler
+        conn.container = self.container_id or str(generate_uuid())
+        conn.heartbeat = heartbeat
+        if url: conn.address = Urls([url])
+        elif urls: conn.address = Urls(urls)
+        elif address: conn.address = address
         else: raise ValueError("One of url, urls or address required")
         if reconnect:
-            context.conn.reconnect = reconnect
+            conn.reconnect = reconnect
         elif reconnect is None:
-            context.conn.reconnect = Backoff()
-        context.conn.open()
-        return context
+            conn.reconnect = Backoff()
+        conn._session_policy = SessionPerConnection() #todo: make configurable
+        conn.open()
+        return conn
+
+    def _get_id(self, container, remote, local):
+        if local and remote: "%s-%s-%s" % (container, remote, local)
+        elif local: return "%s-%s" % (container, local)
+        elif remote: return "%s-%s" % (container, remote)
+        else: return "%s-%s" % (container, str(generate_uuid()))
+
+    def _get_session(self, context):
+        if isinstance(context, Url):
+            return self._get_session(self.connect(url=context))
+        elif isinstance(context, Session):
+            return context
+        elif isinstance(context, Connection):
+            if hasattr(context, '_session_policy'):
+                return context._session_policy.session(context)
+            else:
+                return _create_session(context)
+        else:
+            return context.session()
+
+    def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
+        if isinstance(context, basestring):
+            context = Url(context)
+        if isinstance(context, Url) and not target:
+            target = context.path
+        session = self._get_session(context)
+        snd = session.sender(name or self._get_id(session.connection.container, target, source))
+        if source:
+            snd.source.address = source
+        if target:
+            snd.target.address = target
+        if handler:
+            snd.context = handler
+        snd.tags = tags or delivery_tags()
+        snd.send_msg = types.MethodType(_send_msg, snd)
+        _apply_link_options(options, snd)
+        snd.open()
+        return snd
+
+    def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
+        if isinstance(context, basestring):
+            context = Url(context)
+        if isinstance(context, Url) and not source:
+            source = context.path
+        session = self._get_session(context)
+        rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
+        if source:
+            rcv.source.address = source
+        if dynamic:
+            rcv.source.dynamic = True
+        if target:
+            rcv.target.address = target
+        if handler:
+            rcv.context = handler
+        _apply_link_options(options, rcv)
+        rcv.open()
+        return rcv
+
+    def declare_transaction(self, context, handler=None, settle_before_discharge=False):
+        if not _get_attr(context, '_txn_ctrl'):
+            context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl')
+            context._txn_ctrl.target.type = Terminus.COORDINATOR
+            context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
+        return Transaction(context._txn_ctrl, handler, settle_before_discharge)
 
     def listen(self, url):
         host, port = Urls([url]).next()
@@ -794,15 +800,3 @@ class EventLoop(object):
     def do_work(self, timeout=None):
         return self.loop.do_work(timeout)
 
-EventLoop.DEFAULT = EventLoop()
-
-def connect(url=None, urls=None, address=None, handler=None, reconnect=None, eventloop=None, heartbeat=None):
-    if not eventloop:
-        eventloop = EventLoop.DEFAULT
-    return eventloop.connect(url=url, urls=urls, address=address, handler=handler, reconnect=reconnect, heartbeat=heartbeat)
-
-def run(eventloop=None):
-    if not eventloop:
-        eventloop = EventLoop.DEFAULT
-    eventloop.run()
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index 845c3ab..03c9417 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -17,8 +17,8 @@
 # under the License.
 #
 import Queue, socket, time
-from proton import ConnectionException, Endpoint, Handler, Message, Url
-from proton.reactors import AmqpSocket, Events, MessagingContext, SelectLoop, send_msg
+from proton import ConnectionException, Endpoint, Handler, Message, Timeout, Url
+from proton.reactors import AmqpSocket, Container, Events, SelectLoop, send_msg
 from proton.handlers import ScopedHandler
 
 class BlockingLink(object):
@@ -52,36 +52,32 @@ class BlockingConnection(Handler):
     """
     A synchronous style connection wrapper.
     """
-    def __init__(self, url, timeout=None):
+    def __init__(self, url, timeout=None, container=None):
         self.timeout = timeout
-        self.events = Events(ScopedHandler())
-        self.loop = SelectLoop(self.events)
-        self.context = MessagingContext(self.loop.events.connection(), handler=self)
+        self.container = container or Container()
         if isinstance(url, basestring):
             self.url = Url(url)
         else:
             self.url = url
-        self.loop.add(
-            AmqpSocket(self.context.conn, socket.socket(), self.events).connect(self.url.host, self.url.port))
-        self.context.conn.open()
-        self.wait(lambda: not (self.context.conn.state & Endpoint.REMOTE_UNINIT),
+        self.conn = self.container.connect(url=self.url, handler=self)
+        self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
                   msg="Opening connection")
 
     def create_sender(self, address, handler=None):
-        return BlockingSender(self, self.context.create_sender(address, handler=handler))
+        return BlockingSender(self, self.container.create_sender(self.conn, address, handler=handler))
 
     def create_receiver(self, address, credit=1, dynamic=False, handler=None):
         return BlockingReceiver(
-            self, self.context.create_receiver(address, dynamic=dynamic, handler=handler), credit=credit)
+            self, self.container.create_receiver(self.conn, address, dynamic=dynamic, handler=handler), credit=credit)
 
     def close(self):
-        self.context.conn.close()
-        self.wait(lambda: not (self.context.conn.state & Endpoint.REMOTE_ACTIVE),
+        self.conn.close()
+        self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE),
                   msg="Closing connection")
 
     def run(self):
         """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """
-        self.loop.run()
+        self.container.run()
 
     def wait(self, condition, timeout=False, msg=None):
         """Call do_work until condition() is true"""
@@ -89,11 +85,11 @@ class BlockingConnection(Handler):
             timeout = self.timeout
         if timeout is None:
             while not condition():
-                self.loop.do_work()
+                self.container.do_work()
         else:
             deadline = time.time() + timeout
             while not condition():
-                if not self.loop.do_work(deadline - time.time()):
+                if not self.container.do_work(deadline - time.time()):
                     txt = "Connection %s timed out" % self.url
                     if msg: txt += ": " + msg
                     raise Timeout(txt)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message