Added: qpid/site/docs/releases/qpid-proton-0.9.1/proton/python/examples/db_common.py.html URL: http://svn.apache.org/viewvc/qpid/site/docs/releases/qpid-proton-0.9.1/proton/python/examples/db_common.py.html?rev=1681800&view=auto ============================================================================== --- qpid/site/docs/releases/qpid-proton-0.9.1/proton/python/examples/db_common.py.html (added) +++ qpid/site/docs/releases/qpid-proton-0.9.1/proton/python/examples/db_common.py.html Tue May 26 17:54:36 2015 @@ -0,0 +1,248 @@ + + + + + db_common.py.html - Apache Qpid™ + + + + + + + + + + + +
+
+ Menu + + Search + + +
+ + + + + +
+ +
#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import Queue
+import sqlite3
+import threading
+
+class Db(object):
+    def __init__(self, db, injector):
+        self.db = db
+        self.injector = injector
+        self.tasks = Queue.Queue()
+        self.position = None
+        self.pending_events = []
+        self.running = True
+        self.thread = threading.Thread(target=self._process)
+        self.thread.daemon=True
+        self.thread.start()
+
+    def close(self):
+        self.tasks.put(lambda conn: self._close())
+
+    def reset(self):
+        self.tasks.put(lambda conn: self._reset())
+
+    def load(self, records, event=None):
+        self.tasks.put(lambda conn: self._load(conn, records, event))
+
+    def get_id(self, event):
+        self.tasks.put(lambda conn: self._get_id(conn, event))
+
+    def insert(self, id, data, event=None):
+        self.tasks.put(lambda conn: self._insert(conn, id, data, event))
+
+    def delete(self, id, event=None):
+        self.tasks.put(lambda conn: self._delete(conn, id, event))
+
+    def _reset(self, ignored=None):
+        self.position = None
+
+    def _close(self, ignored=None):
+        self.running = False
+
+    def _get_id(self, conn, event):
+        cursor = conn.execute("SELECT * FROM records ORDER BY id DESC")
+        row = cursor.fetchone()
+        if event:
+            if row:
+                event.id = row['id']
+            else:
+                event.id = 0
+            self.injector.trigger(event)
+
+    def _load(self, conn, records, event):
+        if self.position:
+            cursor = conn.execute("SELECT * FROM records WHERE id > ? ORDER BY id", (self.position,))
+        else:
+            cursor = conn.execute("SELECT * FROM records ORDER BY id")
+        while not records.full():
+            row = cursor.fetchone()
+            if row:
+                self.position = row['id']
+                records.put(dict(row))
+            else:
+                break
+        if event:
+            self.injector.trigger(event)
+
+    def _insert(self, conn, id, data, event):
+        if id:
+            conn.execute("INSERT INTO records(id, description) VALUES (?, ?)", (id, data))
+        else:
+            conn.execute("INSERT INTO records(description) VALUES (?)", (data,))
+        if event:
+            self.pending_events.append(event)
+
+    def _delete(self, conn, id, event):
+        conn.execute("DELETE FROM records WHERE id=?", (id,))
+        if event:
+            self.pending_events.append(event)
+
+    def _process(self):
+        conn = sqlite3.connect(self.db)
+        conn.row_factory = sqlite3.Row
+        with conn:
+            while self.running:
+                f = self.tasks.get(True)
+                try:
+                    while True:
+                        f(conn)
+                        f = self.tasks.get(False)
+                except Queue.Empty: pass
+                conn.commit()
+                for event in self.pending_events:
+                    self.injector.trigger(event)
+                self.pending_events = []
+        self.injector.close()
+
+ + +
+ + + + +
+
+ + Added: qpid/site/docs/releases/qpid-proton-0.9.1/proton/python/examples/db_ctrl.py.html URL: http://svn.apache.org/viewvc/qpid/site/docs/releases/qpid-proton-0.9.1/proton/python/examples/db_ctrl.py.html?rev=1681800&view=auto ============================================================================== --- qpid/site/docs/releases/qpid-proton-0.9.1/proton/python/examples/db_ctrl.py.html (added) +++ qpid/site/docs/releases/qpid-proton-0.9.1/proton/python/examples/db_ctrl.py.html Tue May 26 17:54:36 2015 @@ -0,0 +1,180 @@ + + + + + db_ctrl.py.html - Apache Qpid™ + + + + + + + + + + + +
+
+ Menu + + Search + + +
+ + + + + +
+ +
#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sqlite3
+import sys
+
+if len(sys.argv) < 3:
+    print "Usage: %s [init|insert|list] db" % sys.argv[0]
+else:
+    conn = sqlite3.connect(sys.argv[2])
+    with conn:
+        if sys.argv[1] == "init":
+            conn.execute("DROP TABLE IF EXISTS records")
+            conn.execute("CREATE TABLE records(id INTEGER PRIMARY KEY AUTOINCREMENT, description TEXT)")
+            conn.commit()
+        elif sys.argv[1] == "list":
+            cursor = conn.cursor()
+            cursor.execute("SELECT * FROM records")
+            rows = cursor.fetchall()
+            for r in rows:
+                print r
+        elif sys.argv[1] == "insert":
+            while True:
+                l = sys.stdin.readline()
+                if not l: break
+                conn.execute("INSERT INTO records(description) VALUES (?)", (l.rstrip(),))
+            conn.commit()
+        else:
+            print "Unrecognised command: %s" %  sys.argv[1]
+
+ + +
+ + + + +
+
+ + Added: qpid/site/docs/releases/qpid-proton-0.9.1/proton/python/examples/db_recv.py.html URL: http://svn.apache.org/viewvc/qpid/site/docs/releases/qpid-proton-0.9.1/proton/python/examples/db_recv.py.html?rev=1681800&view=auto ============================================================================== --- qpid/site/docs/releases/qpid-proton-0.9.1/proton/python/examples/db_recv.py.html (added) +++ qpid/site/docs/releases/qpid-proton-0.9.1/proton/python/examples/db_recv.py.html Tue May 26 17:54:36 2015 @@ -0,0 +1,210 @@ + + + + + db_recv.py.html - Apache Qpid™ + + + + + + + + + + + +
+
+ Menu + + Search + + +
+ + + + + +
+ +
#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse
+from proton.handlers import MessagingHandler
+from proton.reactor import ApplicationEvent, Container, EventInjector
+from db_common import Db
+
+class Recv(MessagingHandler):
+    def __init__(self, url, count):
+        super(Recv, self).__init__(auto_accept=False)
+        self.url = url
+        self.delay = 0
+        self.last_id = None
+        self.expected = count
+        self.received = 0
+        self.accepted = 0
+        self.db = Db("dst_db", EventInjector())
+
+    def on_start(self, event):
+        event.container.selectable(self.db.injector)
+        e = ApplicationEvent("id_loaded")
+        e.container = event.container
+        self.db.get_id(e)
+
+    def on_id_loaded(self, event):
+        self.last_id = event.id
+        event.container.create_receiver(self.url)
+
+    def on_record_inserted(self, event):
+        self.accept(event.delivery)
+        self.accepted += 1
+        if self.accepted == self.expected:
+            event.connection.close()
+            self.db.close()
+
+    def on_message(self, event):
+        id = int(event.message.id)
+        if (not self.last_id) or id > self.last_id:
+            if self.received < self.expected:
+                self.received += 1
+                self.last_id = id
+                self.db.insert(id, event.message.body, ApplicationEvent("record_inserted", delivery=event.delivery))
+                print "inserted message %s" % id
+            else:
+                self.release(event.delivery)
+        else:
+            self.accept(event.delivery)
+
+parser = optparse.OptionParser(usage="usage: %prog [options]")
+parser.add_option("-a", "--address", default="localhost:5672/examples",
+                  help="address from which messages are received (default %default)")
+parser.add_option("-m", "--messages", type="int", default=0,
+                  help="number of messages to receive; 0 receives indefinitely (default %default)")
+opts, args = parser.parse_args()
+
+try:
+    Container(Recv(opts.address, opts.messages)).run()
+except KeyboardInterrupt: pass
+
+ + +
+ + + + +
+
+ + Added: qpid/site/docs/releases/qpid-proton-0.9.1/proton/python/examples/db_send.py.html URL: http://svn.apache.org/viewvc/qpid/site/docs/releases/qpid-proton-0.9.1/proton/python/examples/db_send.py.html?rev=1681800&view=auto ============================================================================== --- qpid/site/docs/releases/qpid-proton-0.9.1/proton/python/examples/db_send.py.html (added) +++ qpid/site/docs/releases/qpid-proton-0.9.1/proton/python/examples/db_send.py.html Tue May 26 17:54:36 2015 @@ -0,0 +1,238 @@ + + + + + db_send.py.html - Apache Qpid™ + + + + + + + + + + + +
+
+ Menu + + Search + + +
+ + + + + +
+ +
#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse
+import Queue
+import time
+from proton import Message
+from proton.handlers import MessagingHandler
+from proton.reactor import ApplicationEvent, Container, EventInjector
+from db_common import Db
+
+class Send(MessagingHandler):
+    def __init__(self, url, count):
+        super(Send, self).__init__()
+        self.url = url
+        self.delay = 0
+        self.sent = 0
+        self.confirmed = 0
+        self.load_count = 0
+        self.records = Queue.Queue(maxsize=50)
+        self.target = count
+        self.db = Db("src_db", EventInjector())
+
+    def keep_sending(self):
+        return self.target == 0 or self.sent < self.target
+
+    def on_start(self, event):
+        self.container = event.container
+        self.container.selectable(self.db.injector)
+        self.sender = self.container.create_sender(self.url)
+
+    def on_records_loaded(self, event):
+        if self.records.empty():
+            if event.subject == self.load_count:
+                print "Exhausted available data, waiting to recheck..."
+                # check for new data after 5 seconds
+                self.container.schedule(5, self)
+        else:
+            self.send()
+
+    def request_records(self):
+        if not self.records.full():
+            print "loading records..."
+            self.load_count += 1
+            self.db.load(self.records, event=ApplicationEvent("records_loaded", link=self.sender, subject=self.load_count))
+
+    def on_sendable(self, event):
+        self.send()
+
+    def send(self):
+        while self.sender.credit and not self.records.empty():
+            if not self.keep_sending(): return
+            record = self.records.get(False)
+            id = record['id']
+            self.sender.send(Message(id=id, durable=True, body=record['description']), tag=str(id))
+            self.sent += 1
+            print "sent message %s" % id
+        self.request_records()
+
+    def on_settled(self, event):
+        id = int(event.delivery.tag)
+        self.db.delete(id)
+        print "settled message %s" % id
+        self.confirmed += 1
+        if self.confirmed == self.target:
+            event.connection.close()
+            self.db.close()
+
+    def on_disconnected(self, event):
+        self.db.reset()
+        self.sent = self.confirmed
+
+    def on_timer_task(self, event):
+        print "Rechecking for data..."
+        self.request_records()
+
+parser = optparse.OptionParser(usage="usage: %prog [options]",
+                               description="Send messages to the supplied address.")
+parser.add_option("-a", "--address", default="localhost:5672/examples",
+                  help="address to which messages are sent (default %default)")
+parser.add_option("-m", "--messages", type="int", default=0,
+                  help="number of messages to send; 0 sends indefinitely (default %default)")
+opts, args = parser.parse_args()
+
+try:
+    Container(Send(opts.address, opts.messages)).run()
+except KeyboardInterrupt: pass
+
+ + +
+ + + + +
+
+ + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org