qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From astitc...@apache.org
Subject svn commit: r1530301 [7/8] - in /qpid/trunk/qpid: cpp/src/tests/legacystore/ cpp/src/tests/legacystore/federation/ cpp/src/tests/legacystore/jrnl/ cpp/src/tests/legacystore/jrnl/jtt/ cpp/src/tests/legacystore/python_tests/ tools/src/py/ tools/src/py/qp...
Date Tue, 08 Oct 2013 15:09:01 GMT
Added: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/run-journal-tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/run-journal-tests?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/run-journal-tests (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/run-journal-tests Tue Oct  8 15:09:00 2013
@@ -0,0 +1,47 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+if test x${TMP_DATA_DIR} == x; then
+	export TMP_DATA_DIR=/tmp
+fi
+fail=0
+num_jrnls=3
+
+# Run jtt using default test set
+echo
+echo "===== Mode 1: New journal instance, no recover ====="
+jtt/jtt --analyzer ../../tools/store_chk  --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv --format-chk --num-jrnls ${num_jrnls} || fail=1
+rm -rf ${TMP_DATA_DIR}/test_0*
+echo
+echo "===== Mode 2: Re-use journal instance, no recover ====="
+jtt/jtt --analyzer ../../tools/store_chk --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv --reuse-instance --format-chk --num-jrnls ${num_jrnls} || fail=1
+rm -rf ${TMP_DATA_DIR}/test_0*
+echo
+echo "===== Mode 3: New journal instance, recover previous test journal ====="
+jtt/jtt --analyzer ../../tools/store_chk --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv --recover-mode --format-chk --num-jrnls ${num_jrnls} || fail=1
+rm -rf ${TMP_DATA_DIR}/test_0*
+echo
+echo "===== Mode 4: Re-use journal instance, recover previous test journal ====="
+jtt/jtt --analyzer ../../tools/store_chk --jrnl-dir ${TMP_DATA_DIR} --csv jtt/jtt.csv --reuse-instance --recover-mode --format-chk --num-jrnls ${num_jrnls} || fail=1
+rm -rf ${TMP_DATA_DIR}/test_0*
+echo
+
+exit $fail

Propchange: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/run-journal-tests
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/tests.ods
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/tests.ods?rev=1530301&view=auto
==============================================================================
Files qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/tests.ods (added) and qpid/trunk/qpid/cpp/src/tests/legacystore/jrnl/tests.ods Tue Oct  8 15:09:00 2013 differ

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/__init__.py?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/__init__.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/__init__.py Tue Oct  8 15:09:00 2013
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Do not delete - marks this directory as a python package.
+
+from client_persistence import *
+from resize import *
+

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/client_persistence.py Tue Oct  8 15:09:00 2013
@@ -0,0 +1,234 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from brokertest import EXPECT_EXIT_OK
+from store_test import StoreTest, Qmf, store_args
+from qpid.messaging import *
+
+class ExchangeQueueTests(StoreTest):
+    """
+    Simple tests of the broker exchange and queue types
+    """
+    
+    def test_direct_exchange(self):
+        """Test Direct exchange."""
+        broker = self.broker(store_args(), name="test_direct_exchange", expect=EXPECT_EXIT_OK)
+        msg1 = Message("A_Message1", durable=True, correlation_id="Msg0001")
+        msg2 = Message("B_Message1", durable=True, correlation_id="Msg0002")
+        broker.send_message("a", msg1)
+        broker.send_message("b", msg2)
+        broker.terminate()
+        
+        broker = self.broker(store_args(), name="test_direct_exchange")
+        self.check_message(broker, "a", msg1, True)
+        self.check_message(broker, "b", msg2, True)
+    
+    def test_topic_exchange(self):
+        """Test Topic exchange."""
+        broker = self.broker(store_args(), name="test_topic_exchange", expect=EXPECT_EXIT_OK)
+        ssn = broker.connect().session()
+        snd1 = ssn.sender("abc/key1; {create:always, node:{type:topic, durable:True}}")
+        snd2 = ssn.sender("abc/key2; {create:always, node:{type:topic, durable:True}}")
+        ssn.receiver("a; {create:always, link:{x-bindings:[{exchange:abc, key:key1}]}, node:{durable:True}}")
+        ssn.receiver("b; {create:always, link:{x-bindings:[{exchange:abc, key:key1}]}, node:{durable:True}}")
+        ssn.receiver("c; {create:always, link:{x-bindings:[{exchange:abc, key:key1}, "
+                     "{exchange:abc, key: key2}]}, node:{durable:True}}")
+        ssn.receiver("d; {create:always, link:{x-bindings:[{exchange:abc, key:key2}]}, node:{durable:True}}")
+        ssn.receiver("e; {create:always, link:{x-bindings:[{exchange:abc, key:key2}]}, node:{durable:True}}")
+        msg1 = Message("Message1", durable=True, correlation_id="Msg0003")
+        snd1.send(msg1)
+        msg2 = Message("Message2", durable=True, correlation_id="Msg0004")
+        snd2.send(msg2)
+        broker.terminate()
+        
+        broker = self.broker(store_args(), name="test_topic_exchange")
+        self.check_message(broker, "a", msg1, True)
+        self.check_message(broker, "b", msg1, True)
+        self.check_messages(broker, "c", [msg1, msg2], True)
+        self.check_message(broker, "d", msg2, True)
+        self.check_message(broker, "e", msg2, True)
+        
+    
+    def test_legacy_lvq(self):
+        """Test legacy LVQ."""        
+        broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK)
+        ma1 = Message("A1", durable=True, correlation_id="Msg0005", properties={"qpid.LVQ_key":"A"})
+        ma2 = Message("A2", durable=True, correlation_id="Msg0006", properties={"qpid.LVQ_key":"A"})
+        mb1 = Message("B1", durable=True, correlation_id="Msg0007", properties={"qpid.LVQ_key":"B"})
+        mb2 = Message("B2", durable=True, correlation_id="Msg0008", properties={"qpid.LVQ_key":"B"})
+        mb3 = Message("B3", durable=True, correlation_id="Msg0009", properties={"qpid.LVQ_key":"B"})
+        mc1 = Message("C1", durable=True, correlation_id="Msg0010", properties={"qpid.LVQ_key":"C"})
+        broker.send_messages("lvq-test", [mb1, ma1, ma2, mb2, mb3, mc1],
+                             xprops="arguments:{\"qpid.last_value_queue\":True}")
+        broker.terminate()
+        
+        broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK)
+        ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1], empty=True, ack=False)
+        # Add more messages while subscriber is active (no replacement):
+        ma3 = Message("A3", durable=True, correlation_id="Msg0011", properties={"qpid.LVQ_key":"A"})
+        ma4 = Message("A4", durable=True, correlation_id="Msg0012", properties={"qpid.LVQ_key":"A"})
+        mc2 = Message("C2", durable=True, correlation_id="Msg0013", properties={"qpid.LVQ_key":"C"})
+        mc3 = Message("C3", durable=True, correlation_id="Msg0014", properties={"qpid.LVQ_key":"C"})
+        mc4 = Message("C4", durable=True, correlation_id="Msg0015", properties={"qpid.LVQ_key":"C"})
+        broker.send_messages("lvq-test", [mc2, mc3, ma3, ma4, mc4], session=ssn)
+        ssn.acknowledge()
+        broker.terminate()
+        
+        broker = self.broker(store_args(), name="test_lvq")
+        self.check_messages(broker, "lvq-test", [ma4, mc4], True)
+        
+        
+    def test_fanout_exchange(self):
+        """Test Fanout Exchange"""
+        broker = self.broker(store_args(), name="test_fanout_exchange", expect=EXPECT_EXIT_OK)
+        ssn = broker.connect().session()
+        snd = ssn.sender("TestFanoutExchange; {create: always, node: {type: topic, x-declare: {type: fanout}}}")
+        ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable: True, reliability:at-least-once}}")
+        ssn.receiver("TestFanoutExchange; {link: {name: \"q2\", durable: True, reliability:at-least-once}}")
+        ssn.receiver("TestFanoutExchange; {link: {name: \"q3\", durable: True, reliability:at-least-once}}")
+        msg1 = Message("Msg1", durable=True, correlation_id="Msg0001")
+        snd.send(msg1)
+        msg2 = Message("Msg2", durable=True, correlation_id="Msg0002")
+        snd.send(msg2)
+        broker.terminate()
+        
+        broker = self.broker(store_args(), name="test_fanout_exchange")
+        self.check_messages(broker, "q1", [msg1, msg2], True)
+        self.check_messages(broker, "q2", [msg1, msg2], True)
+        self.check_messages(broker, "q3", [msg1, msg2], True)
+
+
+    def test_message_reject(self):
+        broker = self.broker(store_args(), name="test_message_reject", expect=EXPECT_EXIT_OK)
+        ssn = broker.connect().session()
+        snd = ssn.sender("tmr; {create:always, node:{type:queue, durable:True}}")
+        rcv = ssn.receiver("tmr; {create:always, node:{type:queue, durable:True}}")
+        m1 = Message("test_message_reject", durable=True, correlation_id="Msg0001")
+        snd.send(m1)
+        m2 = rcv.fetch()
+        ssn.acknowledge(message=m2, disposition=Disposition(REJECTED))
+        broker.terminate()
+         
+        broker = self.broker(store_args(), name="test_message_reject")
+        qmf = Qmf(broker)
+        assert qmf.queue_message_count("tmr") == 0
+
+        
+    def test_route(self):
+        """ Test the recovery of a route (link and bridge objects."""
+        broker = self.broker(store_args(), name="test_route", expect=EXPECT_EXIT_OK)
+        qmf = Qmf(broker)
+        qmf_broker_obj = qmf.get_objects("broker")[0]
+        
+        # create a "link"
+        link_args = {"host":"a.fake.host.com", "port":9999, "durable":True,
+                     "authMechanism":"PLAIN", "username":"guest", "password":"guest",
+                     "transport":"tcp"}
+        result = qmf_broker_obj.create("link", "test-link", link_args, False)
+        self.assertEqual(result.status, 0, result)
+        link = qmf.get_objects("link")[0]
+        
+        # create bridge
+        bridge_args = {"link":"test-link", "src":"amq.direct", "dest":"amq.fanout",
+                       "key":"my-key", "durable":True}
+        result = qmf_broker_obj.create("bridge", "test-bridge", bridge_args, False);
+        self.assertEqual(result.status, 0, result)
+        bridge = qmf.get_objects("bridge")[0]
+        
+        broker.terminate()
+        
+        # recover the link and bridge
+        broker = self.broker(store_args(), name="test_route")
+        qmf = Qmf(broker)
+        qmf_broker_obj = qmf.get_objects("broker")[0]
+        self.assertEqual(len(qmf.get_objects("link")), 1)
+        self.assertEqual(len(qmf.get_objects("bridge")), 1)
+
+
+
+class AlternateExchangePropertyTests(StoreTest):
+    """
+    Test the persistence of the Alternate Exchange property for exchanges and queues.
+    """
+
+    def test_exchange(self):
+        """Exchange alternate exchange property persistence test"""
+        broker = self.broker(store_args(), name="test_exchange", expect=EXPECT_EXIT_OK)
+        qmf = Qmf(broker)
+        qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance
+        qmf.add_exchange("testExch", "direct", durable=True, alt_exchange_name="altExch")
+        qmf.close()
+        broker.terminate()
+
+        broker = self.broker(store_args(), name="test_exchange")
+        qmf = Qmf(broker)
+        try:
+            qmf.add_exchange("altExch", "direct", passive=True)
+        except Exception, error:
+            self.fail("Alternate exchange (\"altExch\") instance not recovered: %s" % error)
+        try:
+            qmf.add_exchange("testExch", "direct", passive=True)
+        except Exception, error:
+            self.fail("Test exchange (\"testExch\") instance not recovered: %s" % error)
+        self.assertTrue(qmf.query_exchange("testExch", alt_exchange_name = "altExch"),
+                        "Alternate exchange property not found or is incorrect on exchange \"testExch\".")
+        qmf.close()
+        
+    def test_queue(self):
+        """Queue alternate exchange property persistexchangeNamece test"""
+        broker = self.broker(store_args(), name="test_queue", expect=EXPECT_EXIT_OK)
+        qmf = Qmf(broker)
+        qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance
+        qmf.add_queue("testQueue", durable=True, alt_exchange_name="altExch")
+        qmf.close()
+        broker.terminate()
+
+        broker = self.broker(store_args(), name="test_queue")
+        qmf = Qmf(broker)
+        try:
+            qmf.add_exchange("altExch", "direct", passive=True)
+        except Exception, error:
+            self.fail("Alternate exchange (\"altExch\") instance not recovered: %s" % error)
+        try:
+            qmf.add_queue("testQueue", passive=True)
+        except Exception, error:
+            self.fail("Test queue (\"testQueue\") instance not recovered: %s" % error)
+        self.assertTrue(qmf.query_queue("testQueue", alt_exchange_name = "altExch"),
+                        "Alternate exchange property not found or is incorrect on queue \"testQueue\".")
+        qmf.close()
+
+
+class RedeliveredTests(StoreTest):
+    """
+    Test the behavior of the redelivered flag in the context of persistence
+    """
+
+    def test_broker_recovery(self):
+        """Test that the redelivered flag is set on messages after recovery of broker"""
+        broker = self.broker(store_args(), name="test_broker_recovery", expect=EXPECT_EXIT_OK)
+        msg_content = "xyz"*100
+        msg = Message(msg_content, durable=True)
+        broker.send_message("testQueue", msg)
+        broker.terminate()
+        
+        broker = self.broker(store_args(), name="test_broker_recovery")
+        rcv_msg = broker.get_message("testQueue")
+        self.assertEqual(msg_content, rcv_msg.content)
+        self.assertTrue(rcv_msg.redelivered)
+

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/resize.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/resize.py?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/resize.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/resize.py Tue Oct  8 15:09:00 2013
@@ -0,0 +1,167 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import glob
+import os
+import subprocess
+
+from brokertest import EXPECT_EXIT_OK
+from qpid.datatypes import uuid4
+from store_test import StoreTest, store_args
+from qpid.messaging import Message
+
+class ResizeTest(StoreTest):
+    
+    resize_tool = os.getenv("QPID_STORE_RESIZE_TOOL", "qpid-store-resize")
+    print resize_tool
+    def _resize_store(self, store_dir, queue_name, resize_num_files, resize_file_size, exp_fail):
+        for f in glob.glob(os.path.join(store_dir, "*")):
+            final_store_dir = os.path.join(f, queue_name)
+            p = subprocess.Popen([self.resize_tool, final_store_dir, "--num-jfiles", str(resize_num_files),
+                                  "--jfile-size-pgs", str(resize_file_size), "--quiet"], stdout = subprocess.PIPE,
+                                  stderr = subprocess.STDOUT)
+            res = p.wait()
+            err_found = False
+            try:
+                for l in p.stdout:
+                    if exp_fail:
+                        err_found = True
+                        print "[Expected error]:",
+                    print l,
+            finally:
+                p.stdout.close()
+            return res
+    
+    def _resize_test(self, queue_name, num_msgs, msg_size, resize_num_files, resize_file_size, init_num_files = 8,
+                    init_file_size = 24, exp_fail = False, wait_time = None):
+        # Using a sender will force the creation of an empty persistent queue which is needed for some tests
+        broker = self.broker(store_args(), name="broker", expect=EXPECT_EXIT_OK, wait=wait_time)
+        ssn = broker.connect().session()
+        snd = ssn.sender("%s; {create:always, node:{durable:True}}" % queue_name)
+        
+        msgs = []
+        for index in range(0, num_msgs):
+            msg = Message(self.make_message(index, msg_size), durable=True, id=uuid4(), correlation_id="msg-%04d"%index)
+            msgs.append(msg)
+            snd.send(msg)
+        broker.terminate()
+        
+        res = self._resize_store(os.path.join(self.dir, "broker", "rhm", "jrnl"), queue_name, resize_num_files,
+                             resize_file_size, exp_fail)
+        if res != 0:
+            if exp_fail:
+                return
+            self.fail("ERROR: Resize operation failed with return code %d" % res)
+        elif exp_fail:
+            self.fail("ERROR: Resize operation succeeded, but a failure was expected")
+        
+        broker = self.broker(store_args(), name="broker")
+        self.check_messages(broker, queue_name, msgs, True)
+        
+        # TODO: Check the physical files to check number and size are as expected. 
+
+
+class SimpleTest(ResizeTest):
+    """
+    Simple tests of the resize utility for resizing a journal to larger and smaller sizes.
+    """
+    
+    def test_empty_store_same(self):
+        self._resize_test(queue_name = "empty_store_same",
+                          num_msgs = 0, msg_size = 0,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 8, resize_file_size = 24)
+    
+    def test_empty_store_up(self):
+        self._resize_test(queue_name = "empty_store_up",
+                          num_msgs = 0, msg_size = 0,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 16,  resize_file_size = 48)
+    
+    def test_empty_store_down(self):
+        self._resize_test(queue_name = "empty_store_down",
+                          num_msgs = 0, msg_size = 0,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 6, resize_file_size = 12)
+ 
+# TODO: Put into long tests, make sure there is > 128GB free disk space       
+#    def test_empty_store_max(self):
+#        self._resize_test(queue_name = "empty_store_max",
+#                          num_msgs = 0, msg_size = 0,
+#                          init_num_files = 8, init_file_size = 24,
+#                          resize_num_files = 64, resize_file_size = 32768,
+#                          wait_time = 120)
+    
+    def test_empty_store_min(self):
+        self._resize_test(queue_name = "empty_store_min",
+                          num_msgs = 0, msg_size = 0,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 4, resize_file_size = 1)
+    
+    def test_basic_up(self):
+        self._resize_test(queue_name = "basic_up",
+                          num_msgs = 100, msg_size = 10000,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 16, resize_file_size = 48)
+    
+    def test_basic_down(self):
+        self._resize_test(queue_name = "basic_down",
+                          num_msgs = 100, msg_size = 10000,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 4, resize_file_size = 15)
+    
+    def test_basic_low(self):
+        self._resize_test(queue_name = "basic_low",
+                          num_msgs = 100, msg_size = 10000,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 4, resize_file_size = 4,
+                          exp_fail = True)
+    
+    def test_basic_under(self):
+        self._resize_test(queue_name = "basic_under",
+                          num_msgs = 100, msg_size = 10000,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 4, resize_file_size = 3,
+                          exp_fail = True)
+    
+    def test_very_large_msg_up(self):
+        self._resize_test(queue_name = "very_large_msg_up",
+                          num_msgs = 4, msg_size = 2000000,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 16, resize_file_size = 48)
+    
+    def test_very_large_msg_down(self):
+        self._resize_test(queue_name = "very_large_msg_down",
+                          num_msgs = 4, msg_size = 2000000,
+                          init_num_files = 16, init_file_size = 64,
+                          resize_num_files = 16, resize_file_size = 48)
+    
+    def test_very_large_msg_low(self):
+        self._resize_test(queue_name = "very_large_msg_low",
+                          num_msgs = 4, msg_size = 2000000,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 7, resize_file_size = 20,
+                          exp_fail = True)
+    
+    def test_very_large_msg_under(self):
+        self._resize_test(queue_name = "very_large_msg_under",
+                          num_msgs = 4, msg_size = 2000000,
+                          init_num_files = 8, init_file_size = 24,
+                          resize_num_files = 6, resize_file_size = 8,
+                          exp_fail = True)

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/store_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/store_test.py?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/store_test.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/python_tests/store_test.py Tue Oct  8 15:09:00 2013
@@ -0,0 +1,416 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import re
+from brokertest import BrokerTest
+from qpid.messaging import Empty
+from qmf.console import Session
+
+    
+def store_args(store_dir = None):
+    """Return the broker args necessary to load the async store"""
+    assert BrokerTest.store_lib 
+    if store_dir == None:
+        return []
+    return ["--store-dir", store_dir]
+
+class Qmf:
+    """
+    QMF functions not yet available in the new QMF API. Remove this and replace with new API when it becomes available.
+    """
+    def __init__(self, broker):
+        self.__session = Session()
+        self.__broker = self.__session.addBroker("amqp://localhost:%d"%broker.port())
+
+    def add_exchange(self, exchange_name, exchange_type, alt_exchange_name=None, passive=False, durable=False,
+                     arguments = None):
+        """Add a new exchange"""
+        amqp_session = self.__broker.getAmqpSession()
+        if arguments == None:
+            arguments = {}
+        if alt_exchange_name:
+            amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type,
+                                          alternate_exchange=alt_exchange_name, passive=passive, durable=durable,
+                                          arguments=arguments)
+        else:
+            amqp_session.exchange_declare(exchange=exchange_name, type=exchange_type, passive=passive, durable=durable,
+                                          arguments=arguments)
+    
+    def add_queue(self, queue_name, alt_exchange_name=None, passive=False, durable=False, arguments = None):
+        """Add a new queue"""
+        amqp_session = self.__broker.getAmqpSession()
+        if arguments == None:
+            arguments = {}
+        if alt_exchange_name:
+            amqp_session.queue_declare(queue_name, alternate_exchange=alt_exchange_name, passive=passive,
+                                       durable=durable, arguments=arguments)
+        else:
+            amqp_session.queue_declare(queue_name, passive=passive, durable=durable, arguments=arguments)
+    
+    def delete_queue(self, queue_name):
+        """Delete an existing queue"""
+        amqp_session = self.__broker.getAmqpSession()
+        amqp_session.queue_delete(queue_name)
+
+    def _query(self, name, _class, package, alt_exchange_name=None):
+        """Qmf query function which can optionally look for the presence of an alternate exchange name"""
+        try:
+            obj_list = self.__session.getObjects(_class=_class, _package=package)
+            found = False
+            for obj in obj_list:
+                if obj.name == name:
+                    found = True
+                    if alt_exchange_name != None:
+                        alt_exch_list = self.__session.getObjects(_objectId=obj.altExchange)
+                        if len(alt_exch_list) == 0 or alt_exch_list[0].name != alt_exchange_name:
+                            return False
+                    break
+            return found
+        except Exception:
+            return False
+                
+
+    def query_exchange(self, exchange_name, alt_exchange_name=None):
+        """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known
+        value."""
+        return self._query(exchange_name, "exchange", "org.apache.qpid.broker", alt_exchange_name)
+    
+    def query_queue(self, queue_name, alt_exchange_name=None):
+        """Test for the presence of an exchange, and optionally whether it has an alternate exchange set to a known
+        value."""
+        return self._query(queue_name, "queue", "org.apache.qpid.broker", alt_exchange_name)
+    
+    def queue_message_count(self, queue_name):
+        """Query the number of messages on a queue"""
+        queue_list = self.__session.getObjects(_class="queue", _name=queue_name)
+        if len(queue_list):
+            return queue_list[0].msgDepth
+    
+    def queue_empty(self, queue_name):
+        """Check if a queue is empty (has no messages waiting)"""
+        return self.queue_message_count(queue_name) == 0
+
+    def get_objects(self, target_class, target_package="org.apache.qpid.broker"):
+        return self.__session.getObjects(_class=target_class, _package=target_package)
+
+    
+    def close(self):
+        self.__session.delBroker(self.__broker)
+        self.__session = None
+
+
+class StoreTest(BrokerTest):
+    """
+    This subclass of BrokerTest adds some convenience test/check functions
+    """
+    
+    def _chk_empty(self, queue, receiver):
+        """Check if a queue is empty (has no more messages)"""
+        try:
+            msg = receiver.fetch(timeout=0)
+            self.assert_(False, "Queue \"%s\" not empty: found message: %s" % (queue, msg))
+        except Empty:
+            pass
+
+    @staticmethod
+    def make_message(msg_count, msg_size):
+        """Make message content. Format: 'abcdef....' followed by 'msg-NNNN', where NNNN is the message count"""
+        msg = "msg-%04d" % msg_count
+        msg_len = len(msg)
+        buff = ""
+        if msg_size != None and msg_size > msg_len:
+            for index in range(0, msg_size - msg_len):
+                if index == msg_size - msg_len - 1:
+                    buff += "-"
+                else:
+                    buff += chr(ord('a') + (index % 26))
+        return buff + msg
+    
+    # Functions for formatting address strings
+    
+    @staticmethod
+    def _fmt_csv(string_list, list_braces = None):
+        """Format a list using comma-separation. Braces are optionally added."""
+        if len(string_list) == 0:
+            return ""
+        first = True
+        str_ = ""
+        if list_braces != None:
+            str_ += list_braces[0]
+        for string in string_list:
+            if string != None:
+                if first:
+                    first = False
+                else:
+                    str_ += ", "
+                str_ += string
+        if list_braces != None:
+            str_ += list_braces[1]
+        return str_
+    
+    def _fmt_map(self, string_list):
+        """Format a map {l1, l2, l3, ...} from a string list. Each item in the list must be a formatted map
+        element('key:val')."""
+        return self._fmt_csv(string_list, list_braces="{}") 
+    
+    def _fmt_list(self, string_list):
+        """Format a list [l1, l2, l3, ...] from a string list."""
+        return self._fmt_csv(string_list, list_braces="[]") 
+    
+    def addr_fmt(self, node_name, **kwargs):
+        """Generic AMQP to new address formatter. Takes common (but not all) AMQP options and formats an address
+        string."""
+        # Get keyword args
+        node_subject = kwargs.get("node_subject")
+        create_policy = kwargs.get("create_policy")
+        delete_policy = kwargs.get("delete_policy")
+        assert_policy = kwargs.get("assert_policy")
+        mode = kwargs.get("mode")
+        link = kwargs.get("link", False)
+        link_name = kwargs.get("link_name")
+        node_type = kwargs.get("node_type")
+        durable = kwargs.get("durable", False)
+        link_reliability = kwargs.get("link_reliability")
+        x_declare_list = kwargs.get("x_declare_list", [])
+        x_bindings_list = kwargs.get("x_bindings_list", [])
+        x_subscribe_list = kwargs.get("x_subscribe_list", [])
+        
+        node_flag = not link and (node_type != None or durable or len(x_declare_list) > 0 or len(x_bindings_list) > 0)
+        link_flag = link and (link_name != None or durable or link_reliability != None or len(x_declare_list) > 0 or
+                             len(x_bindings_list) > 0 or len(x_subscribe_list) > 0)
+        assert not (node_flag and link_flag)
+        
+        opt_str_list = []
+        if create_policy != None:
+            opt_str_list.append("create: %s" % create_policy)
+        if delete_policy != None:
+            opt_str_list.append("delete: %s" % delete_policy)
+        if assert_policy != None:
+            opt_str_list.append("assert: %s" % assert_policy)
+        if mode != None:
+            opt_str_list.append("mode: %s" % mode)
+        if node_flag or link_flag:
+            node_str_list = []
+            if link_name != None:
+                node_str_list.append("name: \"%s\"" % link_name)
+            if node_type != None:
+                node_str_list.append("type: %s" % node_type)
+            if durable:
+                node_str_list.append("durable: True")
+            if link_reliability != None:
+                node_str_list.append("reliability: %s" % link_reliability)
+            if len(x_declare_list) > 0:
+                node_str_list.append("x-declare: %s" % self._fmt_map(x_declare_list))
+            if len(x_bindings_list) > 0:
+                node_str_list.append("x-bindings: %s" % self._fmt_list(x_bindings_list))
+            if len(x_subscribe_list) > 0:
+                node_str_list.append("x-subscribe: %s" % self._fmt_map(x_subscribe_list))
+            if node_flag:
+                opt_str_list.append("node: %s" % self._fmt_map(node_str_list))
+            else:
+                opt_str_list.append("link: %s" % self._fmt_map(node_str_list))
+        addr_str = node_name
+        if node_subject != None:
+            addr_str += "/%s" % node_subject
+        if len(opt_str_list) > 0:
+            addr_str += "; %s" % self._fmt_map(opt_str_list)
+        return addr_str
+    
+    def snd_addr(self, node_name, **kwargs):
+        """ Create a send (node) address"""
+        # Get keyword args
+        topic = kwargs.get("topic")
+        topic_flag = kwargs.get("topic_flag", False)
+        auto_create = kwargs.get("auto_create", True)
+        auto_delete = kwargs.get("auto_delete", False)
+        durable = kwargs.get("durable", False)
+        exclusive = kwargs.get("exclusive", False)
+        ftd_count = kwargs.get("ftd_count")
+        ftd_size = kwargs.get("ftd_size")
+        policy = kwargs.get("policy", "flow-to-disk")
+        exchage_type = kwargs.get("exchage_type")
+        
+        create_policy = None
+        if auto_create:
+            create_policy = "always"
+        delete_policy = None
+        if auto_delete:
+            delete_policy = "always"
+        node_type = None
+        if topic != None or topic_flag:
+            node_type = "topic"
+        x_declare_list = ["\"exclusive\": %s" % exclusive]
+        if ftd_count != None or ftd_size != None:
+            queue_policy = ["\'qpid.policy_type\': %s" % policy]
+            if ftd_count:
+                queue_policy.append("\'qpid.max_count\': %d" % ftd_count)
+            if ftd_size:
+                queue_policy.append("\'qpid.max_size\': %d" % ftd_size)
+            x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy))
+        if exchage_type != None:
+            x_declare_list.append("type: %s" % exchage_type)
+            
+        return self.addr_fmt(node_name, topic=topic, create_policy=create_policy, delete_policy=delete_policy,
+                             node_type=node_type, durable=durable, x_declare_list=x_declare_list)
+    
+    def rcv_addr(self, node_name, **kwargs):
+        """ Create a receive (link) address"""
+        # Get keyword args
+        auto_create = kwargs.get("auto_create", True)
+        auto_delete = kwargs.get("auto_delete", False)
+        link_name = kwargs.get("link_name")
+        durable = kwargs.get("durable", False)
+        browse = kwargs.get("browse", False)
+        exclusive = kwargs.get("exclusive", False)
+        binding_list = kwargs.get("binding_list", [])
+        ftd_count = kwargs.get("ftd_count")
+        ftd_size = kwargs.get("ftd_size")
+        policy = kwargs.get("policy", "flow-to-disk")
+        
+        create_policy = None
+        if auto_create:
+            create_policy = "always"
+        delete_policy = None
+        if auto_delete:
+            delete_policy = "always"
+        mode = None
+        if browse:
+            mode = "browse" 
+        x_declare_list = ["\"exclusive\": %s" % exclusive]
+        if ftd_count != None or ftd_size != None:
+            queue_policy = ["\'qpid.policy_type\': %s" % policy]
+            if ftd_count:
+                queue_policy.append("\'qpid.max_count\': %d" % ftd_count)
+            if ftd_size:
+                queue_policy.append("\'qpid.max_size\': %d" % ftd_size)
+            x_declare_list.append("arguments: %s" % self._fmt_map(queue_policy))
+        x_bindings_list = []
+        for binding in binding_list:
+            x_bindings_list.append("{exchange: %s, key: %s}" % binding)
+        if durable: reliability = 'at-least-once'
+        else: reliability = None
+        return self.addr_fmt(node_name, create_policy=create_policy, delete_policy=delete_policy, mode=mode, link=True,
+                             link_name=link_name, durable=durable, x_declare_list=x_declare_list,
+                             x_bindings_list=x_bindings_list, link_reliability=reliability)
+    
+    def check_message(self, broker, queue, exp_msg, transactional=False, empty=False, ack=True, browse=False):
+        """Check that a message is on a queue by dequeuing it and comparing it to the expected message"""
+        return self.check_messages(broker, queue, [exp_msg], transactional, empty, ack, browse)
+        
+    def check_messages(self, broker, queue, exp_msg_list, transactional=False, empty=False, ack=True, browse=False,
+                       emtpy_flag=False):
+        """Check that messages is on a queue by dequeuing them and comparing them to the expected messages"""
+        if emtpy_flag:
+            num_msgs = 0
+        else:
+            num_msgs = len(exp_msg_list)
+        ssn = broker.connect().session(transactional=transactional)
+        rcvr = ssn.receiver(self.rcv_addr(queue, browse=browse), capacity=num_msgs)
+        if num_msgs > 0:
+            try:
+                recieved_msg_list = [rcvr.fetch(timeout=0) for i in range(num_msgs)]
+            except Empty:
+                self.assert_(False, "Queue \"%s\" is empty, unable to retrieve expected message %d." % (queue, i))
+            for i in range(0, len(recieved_msg_list)):
+                self.assertEqual(recieved_msg_list[i].content, exp_msg_list[i].content)
+                self.assertEqual(recieved_msg_list[i].correlation_id, exp_msg_list[i].correlation_id)
+        if empty:
+            self._chk_empty(queue, rcvr)
+        if ack:
+            ssn.acknowledge()
+            if transactional:
+                ssn.commit()
+            ssn.connection.close()
+        else:
+            if transactional:
+                ssn.commit()
+            return ssn
+            
+    
+    # Functions for finding strings in the broker log file (or other files)
+
+    @staticmethod
+    def _read_file(file_name):
+        """Returns the content of file named file_name as a string"""
+        file_handle = file(file_name)
+        try:
+            return file_handle.read()
+        finally:
+            file_handle.close()
+    
+    def _get_hits(self, broker, search):
+        """Find all occurrences of the search in the broker log (eliminating possible duplicates from msgs on multiple
+        queues)"""
+        # TODO: Use sets when RHEL-4 is no longer supported
+        hits = []
+        for hit in search.findall(self._read_file(broker.log)):
+            if hit not in hits:
+                hits.append(hit) 
+        return hits
+    
+    def _reconsile_hits(self, broker, ftd_msgs, release_hits):
+        """Remove entries from list release_hits if they match the message id in ftd_msgs. Check for remaining
+        release_hits."""
+        for msg in ftd_msgs:
+            found = False
+            for hit in release_hits:
+                if str(msg.id) in hit:
+                    release_hits.remove(hit)
+                    #print "Found %s in %s" % (msg.id, broker.log)
+                    found = True
+                    break
+            if not found:
+                self.assert_(False, "Unable to locate released message %s in log %s" % (msg.id, broker.log))
+        if len(release_hits) > 0:
+            err = "Messages were unexpectedly released in log %s:\n" % broker.log
+            for hit in release_hits:
+                err += "  %s\n" % hit
+            self.assert_(False, err)
+        
+    def check_msg_release(self, broker, ftd_msgs):
+        """ Check for 'Content released' messages in broker log for messages in ftd_msgs"""
+        hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+                                                 "Content released$", re.MULTILINE))
+        self._reconsile_hits(broker, ftd_msgs, hits)
+        
+    def check_msg_release_on_commit(self, broker, ftd_msgs):
+        """ Check for 'Content released on commit' messages in broker log for messages in ftd_msgs"""
+        hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+                                                 "Content released on commit$", re.MULTILINE))
+        self._reconsile_hits(broker, ftd_msgs, hits)
+        
+    def check_msg_release_on_recover(self, broker, ftd_msgs):
+        """ Check for 'Content released after recovery' messages in broker log for messages in ftd_msgs"""
+        hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+                                                 "Content released after recovery$", re.MULTILINE))
+        self._reconsile_hits(broker, ftd_msgs, hits)
+    
+    def check_msg_block(self, broker, ftd_msgs):
+        """Check for 'Content release blocked' messages in broker log for messages in ftd_msgs"""
+        hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+                                                 "Content release blocked$", re.MULTILINE))
+        self._reconsile_hits(broker, ftd_msgs, hits)
+     
+    def check_msg_block_on_commit(self, broker, ftd_msgs):
+        """Check for 'Content release blocked' messages in broker log for messages in ftd_msgs"""
+        hits = self._get_hits(broker, re.compile("debug Message id=\"[0-9a-f-]{36}\"; pid=0x[0-9a-f]+: "
+                                                 "Content release blocked on commit$", re.MULTILINE))
+        self._reconsile_hits(broker, ftd_msgs, hits)
+       
+        

Modified: qpid/trunk/qpid/cpp/src/tests/legacystore/run_python_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/run_python_tests?rev=1530301&r1=1530300&r2=1530301&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/run_python_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/run_python_tests Tue Oct  8 15:09:00 2013
@@ -18,47 +18,25 @@
 # under the License.
 #
 
-if test -z ${QPID_DIR} ; then
-    cat <<EOF
+source ../test_env.sh
 
-	===========  WARNING: PYTHON TESTS DISABLED ==============
-
-	QPID_DIR not set.
-
-	===========================================================
-
-EOF
-	exit
-fi
-
-. `dirname $0`/tests_env.sh
+#Add our directory to the python path
+export PYTHONPATH=$srcdir/legacystore:$PYTHONPATH
 
 MODULENAME=python_tests
 
 echo "Running Python tests in module ${MODULENAME}..."
 
-case x$1 in
-    xSHORT_TEST)
-        DEFAULT_PYTHON_TESTS="*.client_persistence.ExchangeQueueTests.* *.flow_to_disk.SimpleMaxSizeCountTest.test_browse_recover *.flow_to_disk.SimpleMaxSizeCountTest.test_durable_browse_recover *.flow_to_disk.MultiDurableQueueDurableMsgBrowseRecoverTxPTxCTest.test_mixed_limit_2" ;;
-    xLONG_TEST)
-        DEFAULT_PYTHON_TESTS= ;;
-    x)
-        DEFAULT_PYTHON_TESTS="*.client_persistence.* *.flow_to_disk.SimpleMaxSizeCountTest.* *.flow_to_disk.MultiDurableQueue*.test_mixed_limit_1 *.flow_to_disk.MultiQueue*.test_mixed_limit_1 *.resize.SimpleTest.* *.federation.*" ;;
-    *)
-        DEFAULT_PYTHON_TESTS=$1
-esac
-
-PYTHON_TESTS=${PYTHON_TESTS:-${DEFAULT_PYTHON_TESTS}}
+test -d $PYTHON_DIR || { echo "Skipping python tests, no python dir."; exit 0; }
+QPID_PORT=${QPID_PORT:-5672}
+FAILING=${FAILING:-/dev/null}
+PYTHON_TESTS=${PYTHON_TESTS:-$*}
 
 OUTDIR=${MODULENAME}.tmp
 rm -rf $OUTDIR
 
 # To debug a test, add the following options to the end of the following line:
 # -v DEBUG -c qpid.messaging.io.ops [*.testName]
-${PYTHON_DIR}/qpid-python-test -m ${MODULENAME} -I ${FAILING_PYTHON_TESTS} ${PYTHON_TESTS} -DOUTDIR=$OUTDIR #-v DEBUG
-RETCODE=$?
+${QPID_PYTHON_TEST} -m ${MODULENAME} -I $FAILING -DOUTDIR=$OUTDIR \
+  $PYTHON_TEST || exit 1
 
-if test x${RETCODE} != x0; then
-    exit 1;
-fi
-exit 0

Propchange: qpid/trunk/qpid/cpp/src/tests/legacystore/run_python_tests
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/tools/src/py/qpid-store-chk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-store-chk?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-store-chk (added)
+++ qpid/trunk/qpid/tools/src/py/qpid-store-chk Tue Oct  8 15:09:00 2013
@@ -0,0 +1,332 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpidstore import jerr, jrnl, janal
+import optparse, os, sys
+
+
+#== class StoreChk ============================================================
+
+class StoreChk(object):
+    """
+    This class:
+     1. Reads a journal jinf file, and from its info:
+     2. Analyzes the journal data files to determine which is the last to be written, then
+     3. Reads and analyzes all the records in the journal files.
+    The only public method is run() which kicks off the analysis.
+    """
+    
+    def __init__(self):
+        """Constructor"""
+        # params
+        self.opts = None
+        
+        self._jdir = None
+        
+        # recovery analysis objects
+#        self._jrnl_info = None
+#        self.jrnl_rdr = None
+        
+        self._process_args()
+        self._jrnl_info = jrnl.JrnlInfo(self._jdir, self.opts.bfn)
+        # FIXME: This is a hack... find an elegant way of getting the file size to jrec!
+        jrnl.JRNL_FILE_SIZE = self._jrnl_info.get_jrnl_file_size_bytes()
+        self.jrnl_anal = janal.JrnlAnalyzer(self._jrnl_info)
+        self.jrnl_rdr = janal.JrnlReader(self._jrnl_info, self.jrnl_anal, self.opts.qflag, self.opts.rflag,
+                                         self.opts.vflag)
+
+    def run(self):
+        """Run the store check"""
+        if not self.opts.qflag:
+            print self._jrnl_info
+            print self.jrnl_anal
+        self.jrnl_rdr.run()
+        self._report()
+        
+    def _report(self):
+        """Print the results of the store check"""
+        if not self.opts.qflag:
+            print
+            print " === REPORT ===="
+            print
+            print "Records:      %8d non-transactional" % \
+                  (self.jrnl_rdr.get_msg_cnt() - self.jrnl_rdr.get_txn_msg_cnt())
+            print "              %8d transactional" % self.jrnl_rdr.get_txn_msg_cnt()
+            print "              %8d total" % self.jrnl_rdr.get_msg_cnt()
+            print
+            print "Transactions: %8d aborts" % self.jrnl_rdr.get_abort_cnt()
+            print "              %8d commits" % self.jrnl_rdr.get_commit_cnt()
+            print "              %8d total" % (self.jrnl_rdr.get_abort_cnt() + self.jrnl_rdr.get_commit_cnt())
+            print
+            if self.jrnl_rdr.emap().size() > 0:
+                print "Remaining enqueued records (sorted by rid): "
+                rid_list = self.jrnl_rdr.emap().rids()
+                rid_list.sort()
+                for rid in rid_list:
+                    l = self.jrnl_rdr.emap().get(rid)
+                    locked = ""
+                    if l[2]:
+                        locked += " (locked)"
+                    print "  fid=%d %s%s" % (l[0], l[1], locked)
+                print "WARNING: Enqueue-Dequeue mismatch, %d enqueued records remain." % self.jrnl_rdr.emap().size()
+            else:
+                print "No remaining enqueued records found (emap empty)."
+            print
+            if self.jrnl_rdr.tmap().size() > 0:
+                txn_rec_cnt = 0
+                print "Incomplete transactions: "
+                for xid in self.jrnl_rdr.tmap().xids():
+                    jrnl.Utils.format_xid(xid)
+                    recs = self.jrnl_rdr.tmap().get(xid)
+                    for l in recs:
+                        print "  fid=%d %s" % (l[0], l[1])
+                    print " Total: %d records for %s" % (len(recs), jrnl.Utils.format_xid(xid))
+                    print
+                    txn_rec_cnt += len(recs)
+                print "WARNING: Incomplete transactions found, %d xids remain containing a total of %d records." % \
+                      (self.jrnl_rdr.tmap().size(), txn_rec_cnt)
+            else:
+                print "No incomplete transactions found (tmap empty)."
+            print
+            print "%d enqueues, %d journal records processed." % \
+                  (self.jrnl_rdr.get_msg_cnt(), self.jrnl_rdr.get_rec_cnt())
+    
+        
+    def _process_args(self):
+        """Process the command-line arguments"""
+        opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
+        opt.add_option("-b", "--base-filename",
+                      action="store", dest="bfn", default="JournalData",
+                      help="Base filename for old journal files")
+        opt.add_option("-q", "--quiet",
+                      action="store_true", dest="qflag",
+                      help="Quiet (suppress all non-error output)")
+        opt.add_option("-r", "--records",
+                      action="store_true", dest="rflag",
+                      help="Print all records and transactions (including consumed/closed)")
+        opt.add_option("-v", "--verbose",
+                      action="store_true", dest="vflag",
+                      help="Verbose output")
+        (self.opts, args) = opt.parse_args()
+        if len(args) == 0:
+            opt.error("No journal directory argument")
+        elif len(args) > 1:
+            opt.error("Too many positional arguments: %s" % args)
+        if self.opts.qflag and self.opts.rflag:
+            opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
+        if self.opts.qflag and self.opts.vflag:
+            opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
+        self._jdir = args[0]
+        if not os.path.exists(self._jdir):
+            opt.error("Journal path \"%s\" does not exist" % self._jdir)
+
+
+#== class CsvStoreChk =========================================================
+
+class CsvStoreChk(StoreChk):
+    """
+    This class, in addition to analyzing a journal, can compare the journal footprint (ie enqueued/dequeued/transaction
+    record counts) to expected values from a CSV file. This can be used for additional automated testing, and is
+    currently in use in the long store tests for journal encode testing.
+    """
+    
+    # CSV file cols
+    TEST_NUM_COL = 0
+    NUM_MSGS_COL = 5
+    MIN_MSG_SIZE_COL = 7
+    MAX_MSG_SIZE_COL = 8
+    MIN_XID_SIZE_COL = 9
+    MAX_XID_SIZE_COL = 10
+    AUTO_DEQ_COL = 11
+    TRANSIENT_COL = 12
+    EXTERN_COL = 13
+    COMMENT_COL = 20
+    
+    def __init__(self):
+        """Constructor"""
+        StoreChk.__init__(self)
+
+        # csv params
+        self.num_msgs = None
+        self.msg_len = None
+        self.auto_deq = None
+        self.xid_len = None
+        self.transient = None
+        self.extern = None
+        
+        self._warning = []
+        
+        self.jrnl_rdr.set_callbacks(self, CsvStoreChk._csv_pre_run_chk, CsvStoreChk._csv_enq_chk,
+                                    CsvStoreChk._csv_deq_chk, CsvStoreChk._csv_txn_chk, CsvStoreChk._csv_post_run_chk)
+        self._get_csv_test()
+
+    def _get_csv_test(self):
+        """Get a test from the CSV reader"""
+        if self.opts.csvfn != None and self.opts.tnum != None:
+            tparams = self._read_csv_file(self.opts.csvfn, self.opts.tnum)
+            if tparams == None:
+                print "ERROR: Test %d not found in CSV file \"%s\"" % (self.opts.tnum, self.opts.csvfn)
+                sys.exit(1)
+            self.num_msgs = tparams["num_msgs"]
+            if tparams["min_size"] == tparams["max_size"]:
+                self.msg_len = tparams["max_size"]
+            else:
+                self.msg_len = 0
+            self.auto_deq = tparams["auto_deq"]
+            if tparams["xid_min_size"] == tparams["xid_max_size"]:
+                self.xid_len = tparams["xid_max_size"]
+            else:
+                self.xid_len = 0
+            self.transient = tparams["transient"]
+            self.extern = tparams["extern"]
+
+    def _read_csv_file(self, filename, tnum):
+        """Read the CSV test parameter file"""
+        try:
+            csvf = open(filename, "r")
+        except IOError:
+            print "ERROR: Unable to open CSV file \"%s\"" % filename
+            sys.exit(1)
+        for line in csvf:
+            str_list = line.strip().split(",")
+            if len(str_list[0]) > 0 and str_list[0][0] != "\"":
+                try:
+                    if (int(str_list[self.TEST_NUM_COL]) == tnum):
+                        return { "num_msgs": int(str_list[self.NUM_MSGS_COL]),
+                                 "min_size": int(str_list[self.MIN_MSG_SIZE_COL]),
+                                 "max_size": int(str_list[self.MAX_MSG_SIZE_COL]),
+                                 "auto_deq": not (str_list[self.AUTO_DEQ_COL] == "FALSE" or
+                                                  str_list[self.AUTO_DEQ_COL] == "0"),
+                                 "xid_min_size": int(str_list[self.MIN_XID_SIZE_COL]),
+                                 "xid_max_size": int(str_list[self.MAX_XID_SIZE_COL]),
+                                 "transient": not (str_list[self.TRANSIENT_COL] == "FALSE" or
+                                                   str_list[self.TRANSIENT_COL] == "0"),
+                                 "extern": not (str_list[self.EXTERN_COL] == "FALSE" or
+                                                str_list[self.EXTERN_COL] == "0"),
+                                 "comment": str_list[self.COMMENT_COL] }
+                except Exception:
+                    pass
+        return None
+        
+    def _process_args(self):
+        """Process command-line arguments"""
+        opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
+        opt.add_option("-b", "--base-filename",
+                      action="store", dest="bfn", default="JournalData",
+                      help="Base filename for old journal files")
+        opt.add_option("-c", "--csv-filename",
+                      action="store", dest="csvfn",
+                      help="CSV filename containing test parameters")
+        opt.add_option("-q", "--quiet",
+                      action="store_true", dest="qflag",
+                      help="Quiet (suppress all non-error output)")
+        opt.add_option("-r", "--records",
+                      action="store_true", dest="rflag",
+                      help="Print all records and transactions (including consumed/closed)")
+        opt.add_option("-t", "--test-num",
+                      action="store", type="int", dest="tnum",
+                      help="Test number from CSV file - only valid if CSV file named")
+        opt.add_option("-v", "--verbose",
+                      action="store_true", dest="vflag",
+                      help="Verbose output")
+        (self.opts, args) = opt.parse_args()
+        if len(args) == 0:
+            opt.error("No journal directory argument")
+        elif len(args) > 1:
+            opt.error("Too many positional arguments: %s" % args)
+        if self.opts.qflag and self.opts.rflag:
+            opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
+        if self.opts.qflag and self.opts.vflag:
+            opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
+        self._jdir = args[0]
+        if not os.path.exists(self._jdir):
+            opt.error("Journal path \"%s\" does not exist" % self._jdir)
+        
+    # Callbacks for checking against CSV test parameters. Return False if ok, True to raise error.
+    
+    #@staticmethod
+    def _csv_pre_run_chk(csv_store_chk):
+        """Check performed before a test runs"""
+        if csv_store_chk.num_msgs == None:
+            return
+        if csv_store_chk.jrnl_anal.is_empty() and csv_store_chk.num_msgs > 0:
+            raise jerr.AllJrnlFilesEmptyCsvError(csv_store_chk.get_opts().tnum, csv_store_chk.num_msgs)
+        return False
+    _csv_pre_run_chk = staticmethod(_csv_pre_run_chk)
+    
+    #@staticmethod
+    def _csv_enq_chk(csv_store_chk, hdr):
+        """Check performed before each enqueue operation"""
+        #if csv_store_chk.num_msgs == None: return
+        # 
+        if csv_store_chk.extern != None:
+            if csv_store_chk.extern != hdr.extern:
+                raise jerr.ExternFlagCsvError(csv_store_chk.opts.tnum, csv_store_chk.extern)
+            if hdr.extern and hdr.data != None:
+                raise jerr.ExternFlagWithDataCsvError(csv_store_chk.opts.tnum)
+        if csv_store_chk.msg_len != None and csv_store_chk.msg_len > 0 and hdr.data != None and \
+           len(hdr.data) != csv_store_chk.msg_len:
+            raise jerr.MessageLengthCsvError(csv_store_chk.opts.tnum, csv_store_chk.msg_len, len(hdr.data))
+        if csv_store_chk.xid_len != None and csv_store_chk.xid_len > 0 and len(hdr.xid) != csv_store_chk.xid_len:
+            raise jerr.XidLengthCsvError(csv_store_chk.opts.tnum, csv_store_chk.xid_len, len(hdr.xid))
+        if csv_store_chk.transient != None and hdr.transient != csv_store_chk.transient:
+            raise jerr.TransactionCsvError(csv_store_chk.opts.tnum, csv_store_chk.transient)
+        return False
+    _csv_enq_chk = staticmethod(_csv_enq_chk)
+    
+    #@staticmethod
+    def _csv_deq_chk(csv_store_chk, hdr):
+        """Check performed before each dequeue operation"""
+        if csv_store_chk.auto_deq != None and not csv_store_chk.auto_deq:
+            raise jerr.JWarning("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." %
+                                (csv_store_chk.opts.tnum, hdr.rid))
+            #self._warning.append("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." %
+            #                     (csv_store_chk.opts.tnum, hdr.rid))
+        return False
+    _csv_deq_chk = staticmethod(_csv_deq_chk)
+    
+    #@staticmethod
+    def _csv_txn_chk(csv_store_chk, hdr):
+        """Check performed before each transaction commit/abort"""
+        return False
+    _csv_txn_chk = staticmethod(_csv_txn_chk)
+
+    #@staticmethod
+    def _csv_post_run_chk(csv_store_chk):
+        """Cehck performed after the completion of the test"""
+        # Exclude this check if lastFileFlag is set - the count may be less than the number of msgs sent because
+        # of journal overwriting
+        if csv_store_chk.num_msgs != None and not csv_store_chk.jrnl_rdr.is_last_file() and \
+           csv_store_chk.num_msgs != csv_store_chk.jrnl_rdr.get_msg_cnt():
+            raise jerr.NumMsgsCsvError(csv_store_chk.opts.tnum, csv_store_chk.num_msgs,
+                                       csv_store_chk.jrnl_rdr.get_msg_cnt())
+        return False
+    _csv_post_run_chk = staticmethod(_csv_post_run_chk)
+
+#==============================================================================
+# main program
+#==============================================================================
+
+if __name__ == "__main__":
+    M = CsvStoreChk()
+    try:
+        M.run()
+    except Exception, e:
+        sys.exit(e)

Propchange: qpid/trunk/qpid/tools/src/py/qpid-store-chk
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/tools/src/py/qpid-store-resize
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-store-resize?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-store-resize (added)
+++ qpid/trunk/qpid/tools/src/py/qpid-store-resize Tue Oct  8 15:09:00 2013
@@ -0,0 +1,350 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpidstore import jerr, jrnl, janal
+import glob, optparse, os, sys, time
+
+
+#== class Resize ==============================================================
+
+class Resize(object):
+    """
+    Creates a new store journal and copies records from old journal to new. The new journal may be of
+    different size from the old one. The records are packed into the new journal (ie only remaining
+    enqueued records and associated transactions - if any - are copied over without spaces between them).
+    
+    The default action is to push the old journal down into a 'bak' sub-directory and then create a
+    new journal of the same size and pack it with the records from the old. However, it is possible to
+    suppress the pushdown (using --no-pushdown), in which case either a new journal id (using
+    --new-base-filename) or an old journal id (usnig --old-base-filename) must be supplied. In the former
+    case,a new journal will be created using the new base file name alongside the old one. In the latter
+    case, the old journal will be renamed to the supplied name, and the new one will take the default.
+    Note that both can be specified together with the --no-pushdown option.
+    
+    To resize the journal, use the optional --num-jfiles and/or --jfile-size parameters. These
+    should be large enough to write all the records or an error will result. If the size is large enough
+    to write all records, but too small to keep below the enqueue threshold, a warning will be printed.
+    Note that as any valid size will be accepted, a journal can also be shrunk, as long as it is sufficiently
+    big to accept the transferred records.
+    """
+    
+    BAK_DIR = "bak"
+    JFILE_SIZE_PGS_MIN = 1
+    JFILE_SIZE_PGS_MAX = 32768
+    NUM_JFILES_MIN = 4
+    NUM_JFILES_MAX = 64
+    
+    def __init__(self):
+        """Constructor"""
+        self._opts = None
+        self._jdir = None
+        self._fname = None
+        self._fnum = None
+        self._file = None
+        self._file_rec_wr_cnt = None
+        self._filler_wr_cnt = None
+        self._last_rec_fid = None
+        self._last_rec_offs = None
+        self._rec_wr_cnt = None
+        
+        self._jrnl_info = None
+        self._jrnl_analysis = None
+        self._jrnl_reader = None
+        
+        self._process_args()
+        self._jrnl_info = jrnl.JrnlInfo(self._jdir, self._opts.bfn)
+        # FIXME: This is a hack... find an elegant way of getting the file size to jrec!
+        jrnl.JRNL_FILE_SIZE = self._jrnl_info.get_jrnl_file_size_bytes()
+        self._jrnl_analysis = janal.JrnlAnalyzer(self._jrnl_info)
+        self._jrnl_reader = janal.JrnlReader(self._jrnl_info, self._jrnl_analysis, self._opts.qflag, self._opts.rflag,
+                                            self._opts.vflag)
+    
+    def run(self):
+        """Perform the action of resizing the journal"""
+        if not self._opts.qflag:
+            print self._jrnl_analysis
+        self._jrnl_reader.run()
+        if self._opts.vflag:
+            print self._jrnl_info
+        if not self._opts.qflag:
+            print self._jrnl_reader.report(self._opts.vflag, self._opts.rflag)
+        self._handle_old_files()
+        self._create_new_files()
+        if not self._opts.qflag:
+            print "Transferred %d records to new journal." % self._rec_wr_cnt
+        self._chk_free()
+    
+    def _chk_free(self):
+        """Check if sufficient space is available in resized journal to be able to enqueue. Raise a warning if not."""
+        if self._last_rec_fid == None or self._last_rec_offs == None:
+            return
+        wr_capacity_bytes = self._last_rec_fid * self._jrnl_info.get_jrnl_data_size_bytes() + self._last_rec_offs
+        tot_capacity_bytes = self._jrnl_info.get_tot_jrnl_data_size_bytes()
+        percent_full = 100.0 * wr_capacity_bytes / tot_capacity_bytes
+        if percent_full > 80.0:
+            raise jerr.JWarning("WARNING: Journal %s is %2.1f%% full and will likely not allow enqueuing of new records"
+                                " until some existing records are dequeued." %
+                                (self._jrnl_info.get_jrnl_id(), percent_full))
+    
+    def _create_new_files(self):
+        """Create new journal files"""
+        # Assemble records to be transfered
+        master_record_list = {}
+        txn_record_list = self._jrnl_reader.txn_obj_list()
+        if self._opts.vflag and self._jrnl_reader.emap().size() > 0:
+            print "* Assembling %d records from emap" % self._jrnl_reader.emap().size()
+        for tup in self._jrnl_reader.emap().get_rec_list():
+            hdr = tup[1]
+            hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
+            master_record_list[long(hdr.rid)] = hdr
+            if hdr.xidsize > 0 and hdr.xid in txn_record_list:
+                txn_hdr = txn_record_list[hdr.xid]
+                del(txn_record_list[hdr.xid])
+                txn_hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
+                master_record_list[long(txn_hdr.rid)] = txn_hdr
+        if self._opts.vflag and self._jrnl_reader.tmap().size() > 0:
+            print "* Assembling %d records from tmap" % self._jrnl_reader.tmap().size()
+        for xid in self._jrnl_reader.tmap().xids():
+            for l in self._jrnl_reader.tmap().get(xid):
+                hdr = l[1]
+                hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi
+                master_record_list[hdr.rid] = hdr
+        rid_list = master_record_list.keys()
+        rid_list.sort()
+            
+        # get base filename
+        bfn = self._opts.bfn
+        if self._opts.nbfn != None:
+            bfn = self._opts.nbfn
+        
+        # write jinf file
+        self._jrnl_info.resize(self._opts.njf, self._opts.jfs)
+        self._jrnl_info.write(self._jdir, bfn)
+        
+        # write records
+        if self._opts.vflag:
+            print "* Transferring records to new journal files"
+        fro = self._jrnl_info.get_jrnl_sblk_size_bytes()
+        while len(rid_list) > 0:
+            hdr = master_record_list[rid_list.pop(0)]
+            rec = hdr.encode()
+            pos = 0
+            while pos < len(rec):
+                if self._file == None or self._file.tell() >= self._jrnl_info.get_jrnl_file_size_bytes():
+                    if self._file == None:
+                        rid = hdr.rid
+                    elif len(rid_list) == 0:
+                        rid = 0
+                    else:
+                        rid = rid_list[0]
+                    if not self._rotate_file(rid, fro):
+                        raise jerr.JournalSpaceExceededError()
+                if len(rec) - pos <= self._jrnl_info.get_jrnl_file_size_bytes() - self._file.tell():
+                    self._file.write(rec[pos:])
+                    self._fill_file(jrnl.Utils.size_in_bytes_to_blk(self._file.tell(),
+                                                                    self._jrnl_info.get_jrnl_dblk_size_bytes()))
+                    pos = len(rec)
+                    fro = self._jrnl_info.get_jrnl_sblk_size_bytes()
+                else:
+                    flen = self._jrnl_info.get_jrnl_file_size_bytes() - self._file.tell()
+                    self._file.write(rec[pos:pos + flen])
+                    pos += flen
+                    rem = len(rec) - pos
+                    if rem <= self._jrnl_info.get_jrnl_data_size_bytes():
+                        fro = (jrnl.Utils.size_in_bytes_to_blk(self._jrnl_info.get_jrnl_sblk_size_bytes() + rem,
+                                                               self._jrnl_info.get_jrnl_dblk_size_bytes()))
+                    else:
+                        fro = 0
+            self._rec_wr_cnt += 1
+            self._file_rec_wr_cnt += 1
+        self._fill_file(add_filler_recs = True)
+        while self._rotate_file():
+            pass
+        
+    def _fill_file(self, to_posn = None, add_filler_recs = False):
+        """Fill a file to a known offset"""
+        if self._file == None:
+            return
+        if add_filler_recs:
+            nfr = int(jrnl.Utils.rem_bytes_in_blk(self._file, self._jrnl_info.get_jrnl_sblk_size_bytes()) /
+                      self._jrnl_info.get_jrnl_dblk_size_bytes())
+            if nfr > 0:
+                self._filler_wr_cnt = nfr
+                for i in range(0, nfr):
+                    self._file.write("RHMx")
+                    self._fill_file(jrnl.Utils.size_in_bytes_to_blk(self._file.tell(),
+                                                                    self._jrnl_info.get_jrnl_dblk_size_bytes()))
+            self._last_rec_fid = self._fnum
+            self._last_rec_offs = self._file.tell()
+        if to_posn == None:
+            to_posn = self._jrnl_info.get_jrnl_file_size_bytes()
+        elif to_posn > self._jrnl_info.get_jrnl_file_size_bytes():
+            raise jerr.FillExceedsFileSizeError(to_posn, self._jrnl_info.get_jrnl_file_size_bytes())
+        diff = to_posn - self._file.tell()
+        self._file.write(str("\0" * diff))
+        #DEBUG
+        if self._file.tell() != to_posn:
+            raise jerr.FillSizeError(self._file.tell(), to_posn)
+        
+    def _rotate_file(self, rid = None, fro = None):
+        """Switch to the next logical file"""
+        if self._file != None:
+            self._file.close()
+            if self._opts.vflag:
+                if self._file_rec_wr_cnt == 0:
+                    print "  (empty)"
+                elif self._filler_wr_cnt == None:
+                    print "  (%d records)" % self._file_rec_wr_cnt
+                else:
+                    print "  (%d records + %d filler(s))" % (self._file_rec_wr_cnt, self._filler_wr_cnt)
+        if self._fnum == None:
+            self._fnum = 0
+            self._rec_wr_cnt = 0
+        elif self._fnum == self._jrnl_info.get_num_jrnl_files() - 1:
+            return False
+        else:
+            self._fnum += 1
+        self._file_rec_wr_cnt = 0
+        self._fname = os.path.join(self._jrnl_info.get_jrnl_dir(), "%s.%04x.jdat" %
+                                   (self._jrnl_info.get_jrnl_base_name(), self._fnum))
+        if self._opts.vflag:
+            print "* Opening file %s" % self._fname,
+        self._file = open(self._fname, "w")
+        if rid == None or fro == None:
+            self._fill_file()
+        else:
+            now = time.time()
+            fhdr = jrnl.FileHdr(0, "RHMf", jrnl.Hdr.HDR_VER, int(jrnl.Hdr.BIG_ENDIAN), 0, rid)
+            fhdr.init(self._file, 0, self._fnum, self._fnum, fro, int(now), 1000000000*(now - int(now)))
+            self._file.write(fhdr.encode())
+            self._fill_file(self._jrnl_info.get_jrnl_sblk_size_bytes())
+        return True
+    
+    def _handle_old_files(self):
+        """Push old journal down into a backup directory"""
+        target_dir = self._jdir
+        if not self._opts.npd:
+            target_dir = os.path.join(self._jdir, self.BAK_DIR)
+            if os.path.exists(target_dir):
+                if self._opts.vflag:
+                    print "* Pushdown directory %s exists, deleting content" % target_dir
+                for fname in glob.glob(os.path.join(target_dir, "*")):
+                    os.unlink(fname)
+            else:
+                if self._opts.vflag:
+                    print "* Creating new pushdown directory %s" % target_dir
+                os.mkdir(target_dir)
+        
+        if not self._opts.npd or self._opts.obfn != None:
+            if self._opts.obfn != None and self._opts.vflag:
+                print "* Renaming old journal files using base name %s" % self._opts.obfn
+            # .jdat files
+            for fname in glob.glob(os.path.join(self._jdir, "%s.*.jdat" % self._opts.bfn)):
+                tbfn = os.path.basename(fname)
+                if self._opts.obfn != None:
+                    per1 = tbfn.rfind(".")
+                    if per1 >= 0:
+                        per2 = tbfn.rfind(".", 0, per1)
+                        if per2 >= 0:
+                            tbfn = "%s%s" % (self._opts.obfn, tbfn[per2:])
+                os.rename(fname, os.path.join(target_dir, tbfn))
+            # .jinf file
+            self._jrnl_info.write(target_dir, self._opts.obfn)
+            os.unlink(os.path.join(self._jdir, "%s.jinf" % self._opts.bfn))
+
+    def _print_options(self):
+        """Print program options"""
+        if self._opts.vflag:
+            print "Journal dir: %s" % self._jdir
+            print "Options: Base filename: %s" % self._opts.bfn
+            print "         New base filename: %s" % self._opts.nbfn
+            print "         Old base filename: %s" % self._opts.obfn
+            print "         Pushdown: %s" % self._opts.npd
+            print "         No. journal files: %d" % self._opts.njf
+            print "         Journal file size: %d 64kiB blocks" % self._opts.jfs
+            print "         Show records flag: %s" % self._opts.rflag
+            print "         Verbose flag: %s" % True
+            print
+    
+    def _process_args(self):
+        """Process the command-line arguments"""
+        opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
+        opt.add_option("-b", "--base-filename",
+                      action="store", dest="bfn", default="JournalData",
+                      help="Base filename for old journal files")
+        opt.add_option("-B", "--new-base-filename",
+                      action="store", dest="nbfn",
+                      help="Base filename for new journal files")
+        opt.add_option("-n", "--no-pushdown",
+                      action="store_true", dest="npd",
+                      help="Suppress pushdown of old files into \"bak\" dir; old files will remain in existing dir")
+        opt.add_option("-N", "--num-jfiles",
+                      action="store", type="int", dest="njf", default=8,
+                      help="Number of files for new journal (%d-%d)" % (self.NUM_JFILES_MIN, self.NUM_JFILES_MAX))
+        opt.add_option("-o", "--old-base-filename",
+                      action="store", dest="obfn",
+                      help="Base filename for old journal files")
+        opt.add_option("-q", "--quiet",
+                      action="store_true", dest="qflag",
+                      help="Quiet (suppress all non-error output)")
+        opt.add_option("-r", "--records",
+                      action="store_true", dest="rflag",
+                      help="Print remaining records and transactions")
+        opt.add_option("-s", "--jfile-size-pgs",
+                      action="store", type="int", dest="jfs", default=24,
+                      help="Size of each new journal file in 64kiB blocks (%d-%d)" %
+                           (self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX))
+        opt.add_option("-v", "--verbose",
+                      action="store_true", dest="vflag",
+                      help="Verbose output")
+        (self._opts, args) = opt.parse_args()
+        if len(args) == 0:
+            opt.error("No journal directory argument")
+        elif len(args) > 1:
+            opt.error("Too many positional arguments: %s" % args)
+        if self._opts.qflag and self._opts.rflag:
+            opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
+        if self._opts.qflag and self._opts.vflag:
+            opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
+        if self._opts.njf != None and (self._opts.njf < self.NUM_JFILES_MIN or self._opts.njf > self.NUM_JFILES_MAX):
+            opt.error("Number of files (%d) is out of range (%d-%d)" %
+                     (self._opts.njf, self.NUM_JFILES_MIN, self.NUM_JFILES_MAX))
+        if self._opts.jfs != None and (self._opts.jfs < self.JFILE_SIZE_PGS_MIN or
+                                       self._opts.jfs > self.JFILE_SIZE_PGS_MAX):
+            opt.error("File size (%d) is out of range (%d-%d)" %
+                     (self._opts.jfs, self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX))
+        if self._opts.npd != None and (self._opts.nbfn == None and self._opts.obfn == None):
+            opt.error("If (-n/--no-pushdown) is used, then at least one of (-B/--new-base-filename) and"
+                     " (-o/--old-base-filename) must be used.")
+        self._jdir = args[0]
+        if not os.path.exists(self._jdir):
+            opt.error("Journal path \"%s\" does not exist" % self._jdir)
+        self._print_options()
+
+#==============================================================================
+# main program
+#==============================================================================
+
+if __name__ == "__main__":
+    R = Resize()
+    try:
+        R.run()
+    except Exception, e:
+        sys.exit(e)

Propchange: qpid/trunk/qpid/tools/src/py/qpid-store-resize
------------------------------------------------------------------------------
    svn:executable = *

Added: qpid/trunk/qpid/tools/src/py/qpidstore/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpidstore/__init__.py?rev=1530301&view=auto
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpidstore/__init__.py (added)
+++ qpid/trunk/qpid/tools/src/py/qpidstore/__init__.py Tue Oct  8 15:09:00 2013
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message