Repository: qpid-proton
Updated Branches:
refs/heads/examples 4706c2b7c -> a37304f96
Support two different settlement modes with transactions; added interactive script for testing/troubleshooting
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a37304f9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a37304f9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a37304f9
Branch: refs/heads/examples
Commit: a37304f9684d3cbe225d57ebc76542350e9efea8
Parents: 4706c2b
Author: Gordon Sim <gsim@redhat.com>
Authored: Wed Nov 26 11:47:50 2014 +0000
Committer: Gordon Sim <gsim@redhat.com>
Committed: Wed Nov 26 11:47:50 2014 +0000
----------------------------------------------------------------------
tutorial/proton_reactors.py | 33 ++++++++++++--
tutorial/tx_recv_interactive.py | 83 ++++++++++++++++++++++++++++++++++++
2 files changed, 112 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a37304f9/tutorial/proton_reactors.py
----------------------------------------------------------------------
diff --git a/tutorial/proton_reactors.py b/tutorial/proton_reactors.py
index 28f287d..f431b24 100644
--- a/tutorial/proton_reactors.py
+++ b/tutorial/proton_reactors.py
@@ -417,13 +417,15 @@ def _send_msg(self, msg, tag=None, handler=None, transaction=None):
class Transaction(object):
- def __init__(self, txn_ctrl, handler):
+ def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
self.txn_ctrl = txn_ctrl
self.handler = handler
self.id = None
self._declare = None
self._discharge = None
self.failed = False
+ self._pending = []
+ self.settle_before_discharge = settle_before_discharge
self.declare()
def commit(self):
@@ -444,6 +446,27 @@ class Transaction(object):
delivery.transaction = self
return delivery
+ def accept(self, delivery):
+ self.update(delivery, PN_ACCEPTED)
+ if self.settle_before_discharge:
+ delivery.settle()
+ else:
+ self._pending.append(delivery)
+
+ def update(self, delivery, state=None):
+ if state:
+ delivery.local.data = [self.id, Described(ulong(state), [])]
+ delivery.update(0x34)
+
+ def _release_pending(self):
+ for d in self._pending:
+ d.update(Delivery.RELEASED)
+ d.settle()
+ self._clear_pending()
+
+ def _clear_pending(self):
+ self._pending = []
+
def handle_outcome(self, event):
if event.delivery == self._declare:
if event.delivery.remote.data:
@@ -458,12 +481,14 @@ class Transaction(object):
if event.delivery.remote_state == Delivery.REJECTED:
if not self.failed:
self.handler.on_transaction_commit_failed(event)
+ self._release_pending() # make this optional?
else:
if self.failed:
self.handler.on_transaction_aborted(event)
+ self._release_pending()
else:
self.handler.on_transaction_committed(event)
-
+ self._clear_pending()
class LinkOption(object):
def apply(self, link): pass
@@ -554,12 +579,12 @@ class MessagingContext(object):
def create_session(self):
return MessageContext(conn=None, ssn=self._new_ssn())
- def declare_transaction(self, handler=None):
+ def declare_transaction(self, handler=None, settle_before_discharge=False):
if not self.txn_ctrl:
self.txn_ctrl = self.create_sender(None, name="txn-ctrl")
self.txn_ctrl.target.type = Terminus.COORDINATOR
self.txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
- return Transaction(self.txn_ctrl, handler)
+ return Transaction(self.txn_ctrl, handler, settle_before_discharge)
def close(self):
if self.ssn:
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a37304f9/tutorial/tx_recv_interactive.py
----------------------------------------------------------------------
diff --git a/tutorial/tx_recv_interactive.py b/tutorial/tx_recv_interactive.py
new file mode 100755
index 0000000..4e36534
--- /dev/null
+++ b/tutorial/tx_recv_interactive.py
@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+import threading
+from proton_reactors import ApplicationEvent, EventLoop
+from proton_handlers import TransactionalClientHandler
+
+class TxRecv(TransactionalClientHandler):
+ def __init__(self):
+ super(TxRecv, self).__init__(prefetch=0)
+
+ def on_start(self, event):
+ self.context = event.reactor.connect("localhost:5672")
+ self.receiver = self.context.create_receiver("examples")
+ #self.context.declare_transaction(handler=self, settle_before_discharge=False)
+ self.context.declare_transaction(handler=self, settle_before_discharge=True)
+ self.transaction = None
+
+ def on_message(self, event):
+ print event.message.body
+ self.transaction.accept(event.delivery)
+
+ def on_transaction_declared(self, event):
+ self.transaction = event.transaction
+ print "transaction declared"
+
+ def on_transaction_committed(self, event):
+ print "transaction committed"
+ self.context.declare_transaction(handler=self)
+
+ def on_transaction_aborted(self, event):
+ print "transaction aborted"
+ self.context.declare_transaction(handler=self)
+
+ def on_commit(self, event):
+ self.transaction.commit()
+
+ def on_abort(self, event):
+ self.transaction.abort()
+
+ def on_fetch(self, event):
+ self.receiver.flow(1)
+
+ def on_quit(self, event):
+ c = self.receiver.connection
+ self.receiver.close()
+ c.close()
+
+try:
+ reactor = EventLoop(TxRecv())
+ events = reactor.get_event_trigger()
+ thread = threading.Thread(target=reactor.run)
+ thread.daemon=True
+ thread.start()
+
+ print "Enter 'fetch', 'commit' or 'abort'"
+ while True:
+ line = sys.stdin.readline()
+ if line:
+ events.trigger(ApplicationEvent(line.strip()))
+ else:
+ break
+except KeyboardInterrupt: pass
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
|