Author: shuston
Date: Fri Nov 5 22:04:08 2010
New Revision: 1031842
URL: http://svn.apache.org/viewvc?rev=1031842&view=rev
Log:
Manage CLFS containers using policies to both create the initial containers and to automatically
grow and shrink the log as needed.
Modified:
qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h
qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp?rev=1031842&r1=1031841&r2=1031842&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp Fri Nov 5 22:04:08 2010
@@ -21,6 +21,7 @@
#include <windows.h>
#include <clfsw32.h>
+#include <clfsmgmtw32.h>
#include <sstream>
#include <string>
#include <vector>
@@ -47,11 +48,12 @@ Log::open(const std::string& path, const
logPath = path;
std::string logSpec = "log:" + path;
size_t specLength = logSpec.length();
- wchar_t *wLogSpec = new wchar_t[specLength + 1];
+ std::auto_ptr<wchar_t> wLogSpec(new wchar_t[specLength + 1]);
size_t converted;
- mbstowcs_s(&converted, wLogSpec, specLength+1, logSpec.c_str(), specLength);
- wLogSpec[converted] = L'\0';
- handle = ::CreateLogFile(wLogSpec,
+ mbstowcs_s(&converted,
+ wLogSpec.get(), specLength+1,
+ logSpec.c_str(), specLength);
+ handle = ::CreateLogFile(wLogSpec.get(),
GENERIC_WRITE | GENERIC_READ,
0,
0,
@@ -62,31 +64,45 @@ Log::open(const std::string& path, const
ULONG infoSize = sizeof(info);
BOOL ok = ::GetLogFileInformation(handle, &info, &infoSize);
QPID_WINDOWS_CHECK_NOT(ok, 0);
+ ok = ::RegisterManageableLogClient(handle, 0);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ // Set up policies for how many containers to initially create and how
+ // large each container should be. Also, auto-grow the log when container
+ // space runs out.
+ CLFS_MGMT_POLICY logPolicy;
+ logPolicy.Version = CLFS_MGMT_POLICY_VERSION;
+ logPolicy.LengthInBytes = sizeof(logPolicy);
+ logPolicy.PolicyFlags = 0;
+
// If this is the first time this log is opened, give an opportunity to
// initialize its content.
bool needInitialize(false);
if (info.TotalContainers == 0) {
- std::vector<const std::wstring> paths;
- LPWSTR cPaths[1024];
- size_t pathLength = logPath.length();
- wchar_t *wLogPath = new wchar_t[pathLength + 1];
- mbstowcs_s(&converted, wLogPath, pathLength+1,
- logPath.c_str(), pathLength);
- wLogPath[converted] = L'\0';
- for (unsigned short i = 0; i < params.containers && i < 1024; ++i)
{
- std::wostringstream path;
- path << wLogPath << L"-container-" << i << std::ends;
- paths.push_back(path.str ());
- cPaths[i] = const_cast<LPWSTR>(paths[i].c_str());
- }
- ok = ::AddLogContainerSet(handle,
- params.containers,
- &this->containerSize,
- cPaths,
- NULL);
+ // New log; set the configured container size and create the
+ // initial set of containers.
+ logPolicy.PolicyType = ClfsMgmtPolicyNewContainerSize;
+ logPolicy.PolicyParameters.NewContainerSize.SizeInBytes = containerSize;
+ ok = ::InstallLogPolicy(handle, &logPolicy);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ ULONGLONG desired(params.containers), actual(0);
+ ok = ::SetLogFileSizeWithPolicy(handle, &desired, &actual);
QPID_WINDOWS_CHECK_NOT(ok, 0);
+
needInitialize = true;
}
+ // Ensure that the log is extended as needed and will shrink when 50%
+ // becomes unused.
+ logPolicy.PolicyType = ClfsMgmtPolicyAutoGrow;
+ logPolicy.PolicyParameters.AutoGrow.Enabled = 1;
+ ok = ::InstallLogPolicy(handle, &logPolicy);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+ logPolicy.PolicyType = ClfsMgmtPolicyAutoShrink;
+ logPolicy.PolicyParameters.AutoShrink.Percentage = params.shrinkPct;
+ ok = ::InstallLogPolicy(handle, &logPolicy);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
// Need a marshaling area
ok = ::CreateLogMarshallingArea(handle,
NULL, NULL, NULL, // Alloc, free, context
@@ -131,7 +147,7 @@ Log::write(void* entry, uint32_t length,
&desc, 1, // Buffer descriptor
0, prev, // Undo-Next, Prev
0, 0, // Reservation
- CLFS_FLAG_FORCE_FLUSH, // CLFS_FLAGS_NO_FLAGS
+ CLFS_FLAG_FORCE_FLUSH,
&lsn,
0);
QPID_WINDOWS_CHECK_NOT(ok, 0);
@@ -155,6 +171,11 @@ Log::moveTail(const CLFS_LSN& oldest)
BOOL ok = ::AdvanceLogBase(marshal,
const_cast<PCLFS_LSN>(&oldest),
0, NULL);
+ // If multiple threads are manipulating things they may get out of
+ // order when moving the tail; if someone already moved it further
+ // than this, it's ok - ignore it.
+ if (ok || ::GetLastError() == ERROR_LOG_START_OF_LOG)
+ return;
QPID_WINDOWS_CHECK_NOT(ok, 0);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h?rev=1031842&r1=1031841&r2=1031842&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/Log.h Fri Nov 5 22:04:08 2010
@@ -53,6 +53,7 @@ public:
struct TuningParameters {
size_t containerSize;
unsigned short containers;
+ unsigned short shrinkPct;
uint32_t maxWriteBuffers;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp?rev=1031842&r1=1031841&r2=1031842&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp Fri Nov 5 22:04:08 2010
@@ -400,6 +400,7 @@ MSSqlClfsProvider::earlyInitialize(Plugi
Log::TuningParameters params;
params.containerSize = options.containerSize;
params.containers = options.initialContainers;
+ params.shrinkPct = 50;
params.maxWriteBuffers = options.maxWriteBuffers;
std::string msgPath = options.storeDir + "\\" + "messages";
messages.openLog(msgPath, params);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org
|