qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject qpid-proton git commit: Support two different settlement modes with transactions; added interactive script for testing/troubleshooting
Date Wed, 26 Nov 2014 13:08:49 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/examples 4706c2b7c -> a37304f96


Support two different settlement modes with transactions; added interactive script for testing/troubleshooting


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a37304f9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a37304f9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a37304f9

Branch: refs/heads/examples
Commit: a37304f9684d3cbe225d57ebc76542350e9efea8
Parents: 4706c2b
Author: Gordon Sim <gsim@redhat.com>
Authored: Wed Nov 26 11:47:50 2014 +0000
Committer: Gordon Sim <gsim@redhat.com>
Committed: Wed Nov 26 11:47:50 2014 +0000

----------------------------------------------------------------------
 tutorial/proton_reactors.py     | 33 ++++++++++++--
 tutorial/tx_recv_interactive.py | 83 ++++++++++++++++++++++++++++++++++++
 2 files changed, 112 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a37304f9/tutorial/proton_reactors.py
----------------------------------------------------------------------
diff --git a/tutorial/proton_reactors.py b/tutorial/proton_reactors.py
index 28f287d..f431b24 100644
--- a/tutorial/proton_reactors.py
+++ b/tutorial/proton_reactors.py
@@ -417,13 +417,15 @@ def _send_msg(self, msg, tag=None, handler=None, transaction=None):
 
 
 class Transaction(object):
-    def __init__(self, txn_ctrl, handler):
+    def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
         self.txn_ctrl = txn_ctrl
         self.handler = handler
         self.id = None
         self._declare = None
         self._discharge = None
         self.failed = False
+        self._pending = []
+        self.settle_before_discharge = settle_before_discharge
         self.declare()
 
     def commit(self):
@@ -444,6 +446,27 @@ class Transaction(object):
         delivery.transaction = self
         return delivery
 
+    def accept(self, delivery):
+        self.update(delivery, PN_ACCEPTED)
+        if self.settle_before_discharge:
+            delivery.settle()
+        else:
+            self._pending.append(delivery)
+
+    def update(self, delivery, state=None):
+        if state:
+            delivery.local.data = [self.id, Described(ulong(state), [])]
+            delivery.update(0x34)
+
+    def _release_pending(self):
+        for d in self._pending:
+            d.update(Delivery.RELEASED)
+            d.settle()
+        self._clear_pending()
+
+    def _clear_pending(self):
+        self._pending = []
+
     def handle_outcome(self, event):
         if event.delivery == self._declare:
             if event.delivery.remote.data:
@@ -458,12 +481,14 @@ class Transaction(object):
             if event.delivery.remote_state == Delivery.REJECTED:
                 if not self.failed:
                     self.handler.on_transaction_commit_failed(event)
+                    self._release_pending() # make this optional?
             else:
                 if self.failed:
                     self.handler.on_transaction_aborted(event)
+                    self._release_pending()
                 else:
                     self.handler.on_transaction_committed(event)
-
+            self._clear_pending()
 
 class LinkOption(object):
     def apply(self, link): pass
@@ -554,12 +579,12 @@ class MessagingContext(object):
     def create_session(self):
         return MessageContext(conn=None, ssn=self._new_ssn())
 
-    def declare_transaction(self, handler=None):
+    def declare_transaction(self, handler=None, settle_before_discharge=False):
         if not self.txn_ctrl:
             self.txn_ctrl = self.create_sender(None, name="txn-ctrl")
             self.txn_ctrl.target.type = Terminus.COORDINATOR
             self.txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
-        return Transaction(self.txn_ctrl, handler)
+        return Transaction(self.txn_ctrl, handler, settle_before_discharge)
 
     def close(self):
         if self.ssn:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a37304f9/tutorial/tx_recv_interactive.py
----------------------------------------------------------------------
diff --git a/tutorial/tx_recv_interactive.py b/tutorial/tx_recv_interactive.py
new file mode 100755
index 0000000..4e36534
--- /dev/null
+++ b/tutorial/tx_recv_interactive.py
@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import threading
+from proton_reactors import ApplicationEvent, EventLoop
+from proton_handlers import TransactionalClientHandler
+
+class TxRecv(TransactionalClientHandler):
+    def __init__(self):
+        super(TxRecv, self).__init__(prefetch=0)
+
+    def on_start(self, event):
+        self.context = event.reactor.connect("localhost:5672")
+        self.receiver = self.context.create_receiver("examples")
+        #self.context.declare_transaction(handler=self, settle_before_discharge=False)
+        self.context.declare_transaction(handler=self, settle_before_discharge=True)
+        self.transaction = None
+
+    def on_message(self, event):
+        print event.message.body
+        self.transaction.accept(event.delivery)
+
+    def on_transaction_declared(self, event):
+        self.transaction = event.transaction
+        print "transaction declared"
+
+    def on_transaction_committed(self, event):
+        print "transaction committed"
+        self.context.declare_transaction(handler=self)
+
+    def on_transaction_aborted(self, event):
+        print "transaction aborted"
+        self.context.declare_transaction(handler=self)
+
+    def on_commit(self, event):
+        self.transaction.commit()
+
+    def on_abort(self, event):
+        self.transaction.abort()
+
+    def on_fetch(self, event):
+        self.receiver.flow(1)
+
+    def on_quit(self, event):
+        c = self.receiver.connection
+        self.receiver.close()
+        c.close()
+
+try:
+    reactor = EventLoop(TxRecv())
+    events = reactor.get_event_trigger()
+    thread = threading.Thread(target=reactor.run)
+    thread.daemon=True
+    thread.start()
+
+    print "Enter 'fetch', 'commit' or 'abort'"
+    while True:
+        line = sys.stdin.readline()
+        if line:
+            events.trigger(ApplicationEvent(line.strip()))
+        else:
+            break
+except KeyboardInterrupt: pass
+
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message