Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 589C973E6 for ; Sat, 24 Sep 2011 20:16:46 +0000 (UTC) Received: (qmail 58255 invoked by uid 500); 24 Sep 2011 20:16:46 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 58221 invoked by uid 500); 24 Sep 2011 20:16:46 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 58211 invoked by uid 99); 24 Sep 2011 20:16:46 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 24 Sep 2011 20:16:46 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 24 Sep 2011 20:16:28 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 0FF262388ABA for ; Sat, 24 Sep 2011 20:16:06 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1175235 [4/5] - in /qpid/trunk: ./ qpid/java/ qpid/java/bdbstore/ qpid/java/bdbstore/bin/ qpid/java/bdbstore/etc/ qpid/java/bdbstore/etc/scripts/ qpid/java/bdbstore/src/ qpid/java/bdbstore/src/main/ qpid/java/bdbstore/src/main/java/ qpid/j... Date: Sat, 24 Sep 2011 20:16:02 -0000 To: commits@qpid.apache.org From: robbie@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20110924201606.0FF262388ABA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java?rev=1175235&view=auto ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java (added) +++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java Sat Sep 24 20:16:00 2011 @@ -0,0 +1,162 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; + +import java.io.*; + +/** + * Handles the mapping to and from 0-8/0-9 message meta data + */ +public class MessageMetaDataTB_4 extends TupleBinding +{ + private static final Logger _log = Logger.getLogger(MessageMetaDataTB_4.class); + + public MessageMetaDataTB_4() + { + } + + public Object entryToObject(TupleInput tupleInput) + { + try + { + final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput); + final ContentHeaderBody contentHeaderBody = readContentHeaderBody(tupleInput); + final int contentChunkCount = tupleInput.readInt(); + + return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount); + } + catch (Exception e) + { + _log.error("Error converting entry to object: " + e, e); + // annoyingly just have to return null since we cannot throw + return null; + } + } + + public void objectToEntry(Object object, TupleOutput tupleOutput) + { + MessageMetaData message = (MessageMetaData) object; + try + { + writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput); + } + catch (AMQException e) + { + // can't do anything else since the BDB interface precludes throwing any exceptions + // in practice we should never get an exception + throw new RuntimeException("Error converting object to entry: " + e, e); + } + writeContentHeader(message.getContentHeaderBody(), tupleOutput); + tupleOutput.writeInt(message.getContentChunkCount()); + } + + private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput) + { + + final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput); + final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput); + final boolean mandatory = tupleInput.readBoolean(); + final boolean immediate = tupleInput.readBoolean(); + + return new MessagePublishInfo() + { + + public AMQShortString getExchange() + { + return exchange; + } + + public void setExchange(AMQShortString exchange) + { + + } + + public boolean isImmediate() + { + return immediate; + } + + public boolean isMandatory() + { + return mandatory; + } + + public AMQShortString getRoutingKey() + { + return routingKey; + } + } ; + + } + + private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException, AMQProtocolVersionException + { + int bodySize = tupleInput.readInt(); + byte[] underlying = new byte[bodySize]; + tupleInput.readFast(underlying); + + try + { + return ContentHeaderBody.createFromBuffer(new DataInputStream(new ByteArrayInputStream(underlying)), bodySize); + } + catch (IOException e) + { + throw new AMQFrameDecodingException(null, e.getMessage(), e); + } + } + + private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput tupleOutput) throws AMQException + { + + AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput); + AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput); + tupleOutput.writeBoolean(publishBody.isMandatory()); + tupleOutput.writeBoolean(publishBody.isImmediate()); + } + + private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput tupleOutput) + { + // write out the content header body + final int bodySize = headerBody.getSize(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(bodySize); + try + { + headerBody.writePayload(new DataOutputStream(baos)); + tupleOutput.writeInt(bodySize); + tupleOutput.writeFast(baos.toByteArray()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + + } +} Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java?rev=1175235&view=auto ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java (added) +++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java Sat Sep 24 20:16:00 2011 @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.MessageMetaDataType; +import org.apache.qpid.server.store.StorableMessageMetaData; + +/** + * Handles the mapping to and from message meta data + */ +public class MessageMetaDataTB_5 extends MessageMetaDataTB_4 +{ + private static final Logger _log = Logger.getLogger(MessageMetaDataTB_5.class); + + @Override + public Object entryToObject(TupleInput tupleInput) + { + try + { + final int bodySize = tupleInput.readInt(); + byte[] dataAsBytes = new byte[bodySize]; + tupleInput.readFast(dataAsBytes); + + java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes); + buf.position(1); + buf = buf.slice(); + MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]]; + StorableMessageMetaData metaData = type.getFactory().createMetaData(buf); + + return metaData; + } + catch (Exception e) + { + _log.error("Error converting entry to object: " + e, e); + // annoyingly just have to return null since we cannot throw + return null; + } + } + + @Override + public void objectToEntry(Object object, TupleOutput tupleOutput) + { + StorableMessageMetaData metaData = (StorableMessageMetaData) object; + + final int bodySize = 1 + metaData.getStorableSize(); + byte[] underlying = new byte[bodySize]; + underlying[0] = (byte) metaData.getType().ordinal(); + java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying); + buf.position(1); + buf = buf.slice(); + + metaData.writeToBuffer(0, buf); + tupleOutput.writeInt(bodySize); + tupleOutput.writeFast(underlying); + } +} Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java?rev=1175235&view=auto ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java (added) +++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java Sat Sep 24 20:16:00 2011 @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import com.sleepycat.bind.tuple.TupleBinding; + +public class MessageMetaDataTupleBindingFactory extends TupleBindingFactory +{ + public MessageMetaDataTupleBindingFactory(int version) + { + super(version); + } + + public TupleBinding getInstance() + { + switch (_version) + { + default: + case 5: + return new MessageMetaDataTB_5(); + case 4: + return new MessageMetaDataTB_4(); + } + } +} Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java?rev=1175235&view=auto ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java (added) +++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java Sat Sep 24 20:16:00 2011 @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; +import org.apache.qpid.server.store.berkeleydb.QueueEntryKey; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; + +public class QueueEntryTB extends TupleBinding +{ + public QueueEntryKey entryToObject(TupleInput tupleInput) + { + AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput); + Long messageId = tupleInput.readLong(); + + return new QueueEntryKey(queueName, messageId); + } + + public void objectToEntry(QueueEntryKey mk, TupleOutput tupleOutput) + { + AMQShortStringEncoding.writeShortString(mk.getQueueName(),tupleOutput); + tupleOutput.writeLong(mk.getMessageId()); + } +} \ No newline at end of file Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java?rev=1175235&view=auto ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java (added) +++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java Sat Sep 24 20:16:00 2011 @@ -0,0 +1,25 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +public interface QueueTuple +{ +} Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java?rev=1175235&view=auto ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java (added) +++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java Sat Sep 24 20:16:00 2011 @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; + +import com.sleepycat.bind.tuple.TupleBinding; + +public class QueueTupleBindingFactory extends TupleBindingFactory +{ + + public QueueTupleBindingFactory(int version) + { + super(version); + } + + public TupleBinding getInstance() + { + switch (_version) + { + default: + case 5: + return new QueueTuple_5(); + case 4: + return new QueueTuple_4(); + } + } +} Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java?rev=1175235&view=auto ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java (added) +++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java Sat Sep 24 20:16:00 2011 @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import com.sleepycat.je.DatabaseException; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; +import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; +import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +public class QueueTuple_4 extends TupleBinding implements QueueTuple +{ + protected static final Logger _logger = Logger.getLogger(QueueTuple_4.class); + + protected FieldTable _arguments; + + public QueueTuple_4() + { + super(); + } + + public QueueRecord entryToObject(TupleInput tupleInput) + { + try + { + AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput); + AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput); + // Addition for Version 2 of this table, read the queue arguments + FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput); + + return new QueueRecord(name, owner, false, arguments); + } + catch (DatabaseException e) + { + _logger.error("Unable to create binding: " + e, e); + return null; + } + + } + + public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput) + { + AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput); + AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput); + // Addition for Version 2 of this table, store the queue arguments + FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput); + } +} Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java?rev=1175235&view=auto ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java (added) +++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java Sat Sep 24 20:16:00 2011 @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; +import com.sleepycat.je.DatabaseException; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding; +import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding; +import org.apache.qpid.server.store.berkeleydb.records.QueueRecord; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; + +public class QueueTuple_5 extends QueueTuple_4 +{ + protected static final Logger _logger = Logger.getLogger(QueueTuple_5.class); + + protected FieldTable _arguments; + + public QueueTuple_5() + { + super(); + } + + public QueueRecord entryToObject(TupleInput tupleInput) + { + try + { + AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput); + AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput); + // Addition for Version 2 of this table, read the queue arguments + FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput); + // Addition for Version 3 of this table, read the queue exclusivity + boolean exclusive = tupleInput.readBoolean(); + + return new QueueRecord(name, owner, exclusive, arguments); + } + catch (DatabaseException e) + { + _logger.error("Unable to create binding: " + e, e); + return null; + } + + } + + public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput) + { + AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput); + AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput); + // Addition for Version 2 of this table, store the queue arguments + FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput); + // Addition for Version 3 of this table, store the queue exclusivity + tupleOutput.writeBoolean(queue.isExclusive()); + } +} Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java?rev=1175235&view=auto ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java (added) +++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java Sat Sep 24 20:16:00 2011 @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb.tuples; + +import com.sleepycat.bind.tuple.TupleBinding; + +public abstract class TupleBindingFactory +{ + protected int _version; + + public TupleBindingFactory(int version) + { + _version = version; + } + + public abstract TupleBinding getInstance(); +} Added: qpid/trunk/qpid/java/bdbstore/src/resources/backup-log4j.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/resources/backup-log4j.xml?rev=1175235&view=auto ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/resources/backup-log4j.xml (added) +++ qpid/trunk/qpid/java/bdbstore/src/resources/backup-log4j.xml Sat Sep 24 20:16:00 2011 @@ -0,0 +1,65 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Added: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java?rev=1175235&view=auto ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java (added) +++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java Sat Sep 24 20:16:00 2011 @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.qpid.framing.AMQShortString; + +import com.sleepycat.bind.tuple.TupleInput; +import com.sleepycat.bind.tuple.TupleOutput; + +import junit.framework.TestCase; + +/** + * Tests for {@code AMQShortStringEncoding} including corner cases when string + * is null or over 127 characters in length + */ +public class AMQShortStringEncodingTest extends TestCase +{ + + public void testWriteReadNullValues() + { + // write into tuple output + TupleOutput tupleOutput = new TupleOutput(); + AMQShortStringEncoding.writeShortString(null, tupleOutput); + byte[] data = tupleOutput.getBufferBytes(); + + // read from tuple input + TupleInput tupleInput = new TupleInput(data); + AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput); + assertNull("Expected null but got " + result, result); + } + + public void testWriteReadShortStringWithLengthOver127() + { + AMQShortString value = createString('a', 128); + + // write into tuple output + TupleOutput tupleOutput = new TupleOutput(); + AMQShortStringEncoding.writeShortString(value, tupleOutput); + byte[] data = tupleOutput.getBufferBytes(); + + // read from tuple input + TupleInput tupleInput = new TupleInput(data); + AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput); + assertEquals("Expected " + value + " but got " + result, value, result); + } + + public void testWriteReadShortStringWithLengthLess127() + { + AMQShortString value = new AMQShortString("test"); + + // write into tuple output + TupleOutput tupleOutput = new TupleOutput(); + AMQShortStringEncoding.writeShortString(value, tupleOutput); + byte[] data = tupleOutput.getBufferBytes(); + + // read from tuple input + TupleInput tupleInput = new TupleInput(data); + AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput); + assertEquals("Expected " + value + " but got " + result, value, result); + } + + private AMQShortString createString(char ch, int length) + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < length; i++) + { + sb.append(ch); + } + return new AMQShortString(sb.toString()); + } + +} Added: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1175235&view=auto ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (added) +++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Sat Sep 24 20:16:00 2011 @@ -0,0 +1,470 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; + +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.message.MessageMetaData_0_10; +import org.apache.qpid.server.store.MessageMetaDataType; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageDeliveryPriority; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageTransfer; + +/** + * Subclass of MessageStoreTest which runs the standard tests from the superclass against + * the BDB Store as well as additional tests specific to the DBB store-implementation. + */ +public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest +{ + /** + * Tests that message metadata and content are successfully read back from a + * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to + * verify their ability to co-exist within the store and be successful retrieved. + */ + public void testBDBMessagePersistence() throws Exception + { + MessageStore store = getVirtualHost().getMessageStore(); + + BDBMessageStore bdbStore = assertBDBStore(store); + + // Create content ByteBuffers. + // Split the content into 2 chunks for the 0-8 message, as per broker behaviour. + // Use a single chunk for the 0-10 message as per broker behaviour. + String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf"; + + ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes()); + ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes()); + + ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes()); + int bodySize = completeContentBody_0_10.limit(); + + /* + * Create and insert a 0-8 message (metadata and multi-chunk content) + */ + MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); + BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); + + ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); + + MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0); + StoredMessage storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8); + + long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime(); + long messageid_0_8 = storedMessage_0_8.getMessageNumber(); + + storedMessage_0_8.addContent(0, firstContentBytes_0_8); + storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8); + storedMessage_0_8.flushToStore(); + + /* + * Create and insert a 0-10 message (metadata and content) + */ + MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize); + DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10(); + Header header_0_10 = new Header(msgProps_0_10, delProps_0_10); + + MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, + MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10); + + MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10); + StoredMessage storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10); + + long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime(); + long messageid_0_10 = storedMessage_0_10.getMessageNumber(); + + storedMessage_0_10.addContent(0, completeContentBody_0_10); + storedMessage_0_10.flushToStore(); + + /* + * reload the store only (read-only) + */ + bdbStore = reloadStoreReadOnly(bdbStore); + + /* + * Read back and validate the 0-8 message metadata and content + */ + StorableMessageMetaData storeableMMD_0_8 = bdbStore.getMessageMetaData(messageid_0_8); + + assertEquals("Unexpected message type",MessageMetaDataType.META_DATA_0_8, storeableMMD_0_8.getType()); + assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData); + MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8; + + assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime()); + + MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo(); + assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange()); + assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate()); + assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory()); + assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey()); + + ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody(); + assertEquals("ContentHeader ClassID has changed", chb_0_8.classId, returnedHeaderBody_0_8.classId); + assertEquals("ContentHeader weight has changed", chb_0_8.weight, returnedHeaderBody_0_8.weight); + assertEquals("ContentHeader bodySize has changed", chb_0_8.bodySize, returnedHeaderBody_0_8.bodySize); + + BasicContentHeaderProperties returnedProperties_0_8 = (BasicContentHeaderProperties) returnedHeaderBody_0_8.getProperties(); + assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString()); + assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString()); + + ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.bodySize) ; + long recoveredCount_0_8 = bdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8); + assertEquals("Incorrect amount of payload data recovered", chb_0_8.bodySize, recoveredCount_0_8); + String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array()); + assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8); + + /* + * Read back and validate the 0-10 message metadata and content + */ + StorableMessageMetaData storeableMMD_0_10 = bdbStore.getMessageMetaData(messageid_0_10); + + assertEquals("Unexpected message type",MessageMetaDataType.META_DATA_0_10, storeableMMD_0_10.getType()); + assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10); + MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10; + + assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime()); + + DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().get(DeliveryProperties.class); + assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10); + assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate()); + assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey()); + assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange()); + assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration()); + assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority()); + + MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().get(MessageProperties.class); + assertNotNull("MessageProperties were not returned", returnedMsgProps); + assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId())); + assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength()); + assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType()); + + ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ; + long recoveredCount = bdbStore.getContent(messageid_0_10, 0, recoveredContent); + assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount); + + String returnedPayloadString_0_10 = new String(recoveredContent.array()); + assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10); + } + + private DeliveryProperties createDeliveryProperties_0_10() + { + DeliveryProperties delProps_0_10 = new DeliveryProperties(); + + delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT); + delProps_0_10.setImmediate(true); + delProps_0_10.setExchange("exchange12345"); + delProps_0_10.setRoutingKey("routingKey12345"); + delProps_0_10.setExpiration(5); + delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE); + + return delProps_0_10; + } + + private MessageProperties createMessageProperties_0_10(int bodySize) + { + MessageProperties msgProps_0_10 = new MessageProperties(); + msgProps_0_10.setContentLength(bodySize); + msgProps_0_10.setCorrelationId("qwerty".getBytes()); + msgProps_0_10.setContentType("text/html"); + + return msgProps_0_10; + } + + /** + * Close the provided store and create a new (read-only) store to read back the data. + * + * Use this method instead of reloading the virtual host like other tests in order + * to avoid the recovery handler deleting the message for not being on a queue. + */ + private BDBMessageStore reloadStoreReadOnly(BDBMessageStore messageStore) throws Exception + { + messageStore.close(); + File storePath = new File(String.valueOf(_config.getProperty("store.environment-path"))); + + BDBMessageStore newStore = new BDBMessageStore(); + newStore.configure(storePath, false); + newStore.start(); + + return newStore; + } + + private MessagePublishInfo createPublishInfoBody_0_8() + { + return new MessagePublishInfo() + { + public AMQShortString getExchange() + { + return new AMQShortString("exchange12345"); + } + + public void setExchange(AMQShortString exchange) + { + } + + public boolean isImmediate() + { + return false; + } + + public boolean isMandatory() + { + return true; + } + + public AMQShortString getRoutingKey() + { + return new AMQShortString("routingKey12345"); + } + }; + + } + + private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length) + { + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); + int classForBasic = methodRegistry.createBasicQosOkBody().getClazz(); + return new ContentHeaderBody(classForBasic, 1, props, length); + } + + private BasicContentHeaderProperties createContentHeaderProperties_0_8() + { + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue()); + props.setContentType("text/html"); + props.getHeaders().setString("Test", "MST"); + return props; + } + + /** + * Tests that messages which are added to the store and then removed using the + * public MessageStore interfaces are actually removed from the store by then + * interrogating the store with its own implementation methods and verifying + * expected exceptions are thrown to indicate the message is not present. + */ + public void testMessageCreationAndRemoval() throws Exception + { + MessageStore store = getVirtualHost().getMessageStore(); + BDBMessageStore bdbStore = assertBDBStore(store); + + StoredMessage storedMessage_0_8 = createAndStoreMultiChunkMessage_0_8(store); + long messageid_0_8 = storedMessage_0_8.getMessageNumber(); + + //remove the message in the fashion the broker normally would + storedMessage_0_8.remove(); + + //verify the removal using the BDB store implementation methods directly + try + { + // the next line should throw since the message id should not be found + bdbStore.getMessageMetaData(messageid_0_8); + fail("No exception thrown when message id not found getting metadata"); + } + catch (AMQStoreException e) + { + // pass since exception expected + } + + //expecting no content, allocate a 1 byte + ByteBuffer dst = ByteBuffer.allocate(1); + + assertEquals("Retrieved content when none was expected", + 0, bdbStore.getContent(messageid_0_8, 0, dst)); + } + + private BDBMessageStore assertBDBStore(Object store) + { + if(!(store instanceof BDBMessageStore)) + { + fail("Test requires an instance of BDBMessageStore to proceed"); + } + + return (BDBMessageStore) store; + } + + private StoredMessage createAndStoreMultiChunkMessage_0_8(MessageStore store) + { + byte[] body10Bytes = "0123456789".getBytes(); + byte[] body5Bytes = "01234".getBytes(); + + ByteBuffer chunk1 = ByteBuffer.wrap(body10Bytes); + ByteBuffer chunk2 = ByteBuffer.wrap(body5Bytes); + + int bodySize = body10Bytes.length + body5Bytes.length; + + //create and store the message using the MessageStore interface + MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); + BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); + + ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); + + MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0); + StoredMessage storedMessage_0_8 = store.addMessage(messageMetaData_0_8); + + storedMessage_0_8.addContent(0, chunk1); + storedMessage_0_8.addContent(chunk1.limit(), chunk2); + storedMessage_0_8.flushToStore(); + + return storedMessage_0_8; + } + + /** + * Tests transaction commit by utilising the enqueue and dequeue methods available + * in the TransactionLog interface implemented by the store, and verifying the + * behaviour using BDB implementation methods. + */ + public void testTranCommit() throws Exception + { + TransactionLog log = getVirtualHost().getTransactionLog(); + + BDBMessageStore bdbStore = assertBDBStore(log); + + final AMQShortString mockQueueName = new AMQShortString("queueName"); + + TransactionLogResource mockQueue = new TransactionLogResource() + { + public String getResourceName() + { + return mockQueueName.asString(); + } + }; + + TransactionLog.Transaction txn = log.newTransaction(); + + txn.enqueueMessage(mockQueue, 1L); + txn.enqueueMessage(mockQueue, 5L); + txn.commitTran(); + + List enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); + + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); + Long val = enqueuedIds.get(0); + assertEquals("First Message is incorrect", 1L, val.longValue()); + val = enqueuedIds.get(1); + assertEquals("Second Message is incorrect", 5L, val.longValue()); + } + + + /** + * Tests transaction rollback before a commit has occurred by utilising the + * enqueue and dequeue methods available in the TransactionLog interface + * implemented by the store, and verifying the behaviour using BDB + * implementation methods. + */ + public void testTranRollbackBeforeCommit() throws Exception + { + TransactionLog log = getVirtualHost().getTransactionLog(); + + BDBMessageStore bdbStore = assertBDBStore(log); + + final AMQShortString mockQueueName = new AMQShortString("queueName"); + + TransactionLogResource mockQueue = new TransactionLogResource() + { + public String getResourceName() + { + return mockQueueName.asString(); + } + }; + + TransactionLog.Transaction txn = log.newTransaction(); + + txn.enqueueMessage(mockQueue, 21L); + txn.abortTran(); + + txn = log.newTransaction(); + txn.enqueueMessage(mockQueue, 22L); + txn.enqueueMessage(mockQueue, 23L); + txn.commitTran(); + + List enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); + + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); + Long val = enqueuedIds.get(0); + assertEquals("First Message is incorrect", 22L, val.longValue()); + val = enqueuedIds.get(1); + assertEquals("Second Message is incorrect", 23L, val.longValue()); + } + + /** + * Tests transaction rollback after a commit has occurred by utilising the + * enqueue and dequeue methods available in the TransactionLog interface + * implemented by the store, and verifying the behaviour using BDB + * implementation methods. + */ + public void testTranRollbackAfterCommit() throws Exception + { + TransactionLog log = getVirtualHost().getTransactionLog(); + + BDBMessageStore bdbStore = assertBDBStore(log); + + final AMQShortString mockQueueName = new AMQShortString("queueName"); + + TransactionLogResource mockQueue = new TransactionLogResource() + { + public String getResourceName() + { + return mockQueueName.asString(); + } + }; + + TransactionLog.Transaction txn = log.newTransaction(); + + txn.enqueueMessage(mockQueue, 30L); + txn.commitTran(); + + txn = log.newTransaction(); + txn.enqueueMessage(mockQueue, 31L); + txn.abortTran(); + + txn = log.newTransaction(); + txn.enqueueMessage(mockQueue, 32L); + txn.commitTran(); + + List enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName); + + assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size()); + Long val = enqueuedIds.get(0); + assertEquals("First Message is incorrect", 30L, val.longValue()); + val = enqueuedIds.get(1); + assertEquals("Second Message is incorrect", 32L, val.longValue()); + } + +} Added: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java?rev=1175235&view=auto ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java (added) +++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java Sat Sep 24 20:16:00 2011 @@ -0,0 +1,232 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; + +import junit.framework.TestCase; + +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.url.URLSyntaxException; + +/** + * Prepares an older version brokers BDB store with the required + * contents for use in the BDBStoreUpgradeTest. + * + * The store will then be used to verify that the upgraded is + * completed properly and that once upgraded it functions as + * expected with the new broker. + */ +public class BDBStoreUpgradeTestPreparer extends TestCase +{ + public static final String TOPIC_NAME="myUpgradeTopic"; + public static final String SUB_NAME="myDurSubName"; + public static final String QUEUE_NAME="myUpgradeQueue"; + + private static AMQConnectionFactory _connFac; + private static final String CONN_URL = + "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'"; + + /** + * Create a BDBStoreUpgradeTestPreparer instance + */ + public BDBStoreUpgradeTestPreparer () throws URLSyntaxException + { + _connFac = new AMQConnectionFactory(CONN_URL); + } + + /** + * Utility test method to allow running the preparation tool + * using the test framework + */ + public void testPrepareBroker() throws Exception + { + prepareBroker(); + } + + private void prepareBroker() throws Exception + { + prepareQueues(); + prepareDurableSubscription(); + } + + /** + * Prepare a queue for use in testing message and binding recovery + * after the upgrade is performed. + * + * - Create a transacted session on the connection. + * - Use a consumer to create the (durable by default) queue. + * - Send 5 large messages to test (multi-frame) content recovery. + * - Send 1 small message to test (single-frame) content recovery. + * - Commit the session. + * - Send 5 small messages to test that uncommitted messages are not recovered. + * following the upgrade. + * - Close the session. + */ + private void prepareQueues() throws Exception + { + // Create a connection + Connection connection = _connFac.createConnection(); + connection.start(); + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + e.printStackTrace(); + } + }); + // Create a session on the connection, transacted to confirm delivery + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(QUEUE_NAME); + // Create a consumer to ensure the queue gets created + // (and enter it into the store, as queues are made durable by default) + MessageConsumer messageConsumer = session.createConsumer(queue); + messageConsumer.close(); + + // Create a Message producer + MessageProducer messageProducer = session.createProducer(queue); + + // Publish 5 persistent messages, 256k chars to ensure they are multi-frame + sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 5); + // Publish 5 persistent messages, 1k chars to ensure they are single-frame + sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5); + + session.commit(); + + // Publish 5 persistent messages which will NOT be committed and so should be 'lost' + sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5); + + session.close(); + connection.close(); + } + + /** + * Prepare a DurableSubscription backing queue for use in testing selector + * recovery and queue exclusivity marking during the upgrade process. + * + * - Create a transacted session on the connection. + * - Open and close a DurableSubscription with selector to create the backing queue. + * - Send a message which matches the selector. + * - Send a message which does not match the selector. + * - Send a message which matches the selector but will remain uncommitted. + * - Close the session. + */ + private void prepareDurableSubscription() throws Exception + { + + // Create a connection + TopicConnection connection = _connFac.createTopicConnection(); + connection.start(); + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException e) + { + e.printStackTrace(); + } + }); + // Create a session on the connection, transacted to confirm delivery + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(TOPIC_NAME); + + // Create and register a durable subscriber with selector and then close it + TopicSubscriber durSub1 = session.createDurableSubscriber(topic, SUB_NAME,"testprop='true'", false); + durSub1.close(); + + // Create a publisher and send a persistent message which matches the selector + // followed by one that does not match, and another which matches but is not + // committed and so should be 'lost' + TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED); + TopicPublisher publisher = pubSession.createPublisher(topic); + + publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true"); + publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false"); + pubSession.commit(); + publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true"); + + publisher.close(); + pubSession.close(); + + } + + public static void sendMessages(Session session, MessageProducer messageProducer, + Destination dest, int deliveryMode, int length, int numMesages) throws JMSException + { + for (int i = 1; i <= numMesages; i++) + { + Message message = session.createTextMessage(generateString(length)); + message.setIntProperty("ID", i); + messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + } + + public static void publishMessages(Session session, TopicPublisher publisher, + Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException + { + for (int i = 1; i <= numMesages; i++) + { + Message message = session.createTextMessage(generateString(length)); + message.setIntProperty("ID", i); + message.setStringProperty("testprop", selectorProperty); + publisher.publish(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + } + + /** + * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2. + * + * @param length number of characters in the string + * @return string sequence of the given length + */ + public static String generateString(int length) + { + char[] base_chars = new char[]{'0','1','2','3','4','5','6','7','8','9'}; + char[] chars = new char[length]; + for (int i = 0; i < (length); i++) + { + chars[i] = base_chars[i % 10]; + } + return new String(chars); + } + + /** + * Run the preparation tool. + * @param args Command line arguments. + */ + public static void main(String[] args) throws Exception + { + BDBStoreUpgradeTestPreparer producer = new BDBStoreUpgradeTestPreparer(); + producer.prepareBroker(); + } +} \ No newline at end of file Added: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java?rev=1175235&view=auto ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java (added) +++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java Sat Sep 24 20:16:00 2011 @@ -0,0 +1,540 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME; +import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.management.common.mbeans.ManagedQueue; +import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.store.TransactionLog; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4; +import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory; +import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory; +import org.apache.qpid.test.utils.JMXTestUtils; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sleepycat.bind.tuple.TupleBinding; +import com.sleepycat.je.DatabaseEntry; + +/** + * Tests upgrading a BDB store and using it with the new broker + * after the required contents are entered into the store using + * an old broker with the BDBStoreUpgradeTestPreparer. The store + * will then be used to verify that the upgraded is completed + * properly and that once upgraded it functions as expected with + * the new broker. + */ +public class BDBUpgradeTest extends QpidBrokerTestCase +{ + protected static final Logger _logger = LoggerFactory.getLogger(BDBUpgradeTest.class); + + private static final String STRING_1024 = BDBStoreUpgradeTestPreparer.generateString(1024); + private static final String STRING_1024_256 = BDBStoreUpgradeTestPreparer.generateString(1024*256); + private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK"); + private static final String QPID_HOME = System.getProperty("QPID_HOME"); + private static final int VERSION_4 = 4; + + private String _fromDir; + private String _toDir; + private String _toDirTwice; + + @Override + public void setUp() throws Exception + { + assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG); + assertNotNull("QPID_HOME must be set", QPID_HOME); + + if(! isExternalBroker()) + { + //override QPID_WORK to add the InVM port used so the store + //output from the upgrade tool can be found by the broker + setSystemProperty("QPID_WORK", QPID_WORK_ORIG + "/" + getPort()); + } + + _fromDir = QPID_HOME + "/bdbstore-to-upgrade/test-store"; + _toDir = getWorkDirBaseDir() + "/bdbstore/test-store"; + _toDirTwice = getWorkDirBaseDir() + "/bdbstore-upgraded-twice"; + + //Clear the two target directories if they exist. + File directory = new File(_toDir); + if (directory.exists() && directory.isDirectory()) + { + FileUtils.delete(directory, true); + } + directory = new File(_toDirTwice); + if (directory.exists() && directory.isDirectory()) + { + FileUtils.delete(directory, true); + } + + //Upgrade the test store. + upgradeBrokerStore(_fromDir, _toDir); + + //override the broker config used and then start the broker with the updated store + _configFile = new File(QPID_HOME, "etc/config-systests-bdb.xml"); + setConfigurationProperty("management.enabled", "true"); + + super.setUp(); + } + + private String getWorkDirBaseDir() + { + return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort()); + } + + /** + * Tests that the core upgrade method of the store upgrade tool passes through the exception + * from the BDBMessageStore indicating that the data on disk can't be loaded as the previous + * version because it has already been upgraded. + * @throws Exception + */ + public void testMultipleUpgrades() throws Exception + { + //stop the broker started by setUp() in order to allow the second upgrade attempt to proceed + stopBroker(); + + try + { + new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(VERSION_4); + fail("Second Upgrade Succeeded"); + } + catch (Exception e) + { + System.err.println("Showing stack trace, we are expecting an 'Unable to load BDBStore' error"); + e.printStackTrace(); + assertTrue("Incorrect Exception Thrown:" + e.getMessage(), + e.getMessage().contains("Unable to load BDBStore as version 4. Store on disk contains version 5 data")); + } + } + + /** + * Test that the selector applied to the DurableSubscription was successfully + * transfered to the new store, and functions as expected with continued use + * by monitoring message count while sending new messages to the topic. + */ + public void testSelectorDurability() throws Exception + { + JMXTestUtils jmxUtils = null; + try + { + jmxUtils = new JMXTestUtils(this, "guest", "guest"); + jmxUtils.open(); + } + catch (Exception e) + { + fail("Unable to establish JMX connection, test cannot proceed"); + } + + try + { + ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME); + assertEquals("DurableSubscription backing queue should have 1 message on it initially", + new Integer(1), dursubQueue.getMessageCount()); + + // Create a connection and start it + TopicConnection connection = (TopicConnection) getConnection(); + connection.start(); + + // Send messages which don't match and do match the selector, checking message count + TopicSession pubSession = connection.createTopicSession(true, org.apache.qpid.jms.Session.SESSION_TRANSACTED); + Topic topic = pubSession.createTopic(TOPIC_NAME); + TopicPublisher publisher = pubSession.createPublisher(topic); + + BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false"); + pubSession.commit(); + assertEquals("DurableSubscription backing queue should still have 1 message on it", + new Integer(1), dursubQueue.getMessageCount()); + + BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true"); + pubSession.commit(); + assertEquals("DurableSubscription backing queue should now have 2 messages on it", + new Integer(2), dursubQueue.getMessageCount()); + + dursubQueue.clearQueue(); + pubSession.close(); + } + finally + { + jmxUtils.close(); + } + } + + /** + * Test that the backing queue for the durable subscription created was successfully + * detected and set as being exclusive during the upgrade process, and that the + * regular queue was not. + */ + public void testQueueExclusivity() throws Exception + { + JMXTestUtils jmxUtils = null; + try + { + jmxUtils = new JMXTestUtils(this, "guest", "guest"); + jmxUtils.open(); + } + catch (Exception e) + { + fail("Unable to establish JMX connection, test cannot proceed"); + } + + try + { + ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_NAME); + assertFalse("Queue should not have been marked as Exclusive during upgrade", queue.isExclusive()); + + ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME); + assertTrue("DurableSubscription backing queue should have been marked as Exclusive during upgrade", dursubQueue.isExclusive()); + } + finally + { + jmxUtils.close(); + } + } + + /** + * Test that the upgraded queue continues to function properly when used + * for persistent messaging and restarting the broker. + * + * Sends the new messages to the queue BEFORE consuming those which were + * sent before the upgrade. In doing so, this also serves to test that + * the queue bindings were successfully transitioned during the upgrade. + */ + public void testBindingAndMessageDurabability() throws Exception + { + // Create a connection and start it + TopicConnection connection = (TopicConnection) getConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + MessageProducer messageProducer = session.createProducer(queue); + + // Send a new message + BDBStoreUpgradeTestPreparer.sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 1); + + session.close(); + + // Restart the broker + restartBroker(); + + // Drain the queue of all messages + connection = (TopicConnection) getConnection(); + connection.start(); + consumeQueueMessages(connection, true); + } + + /** + * Test that all of the committed persistent messages previously sent to + * the broker are properly received following update of the MetaData and + * Content entries during the store upgrade process. + */ + public void testConsumptionOfUpgradedMessages() throws Exception + { + // Create a connection and start it + Connection connection = getConnection(); + connection.start(); + + consumeDurableSubscriptionMessages(connection); + consumeQueueMessages(connection, false); + } + + /** + * Tests store migration containing messages for non-existing queue. + * + * @throws Exception + */ + public void testMigrationOfMessagesForNonExistingQueues() throws Exception + { + stopBroker(); + + // copy store data into a new location for adding of phantom message + File storeLocation = new File(_fromDir); + File target = new File(_toDirTwice); + if (!target.exists()) + { + target.mkdirs(); + } + FileUtils.copyRecursive(storeLocation, target); + + // delete migrated data + File directory = new File(_toDir); + if (directory.exists() && directory.isDirectory()) + { + FileUtils.delete(directory, true); + } + + // test data + String nonExistingQueueName = getTestQueueName(); + String messageText = "Test Phantom Message"; + + // add message + addMessageForNonExistingQueue(target, VERSION_4, nonExistingQueueName, messageText); + + String[] inputs = { "Yes", "Yes", "Yes" }; + upgradeBrokerStoreInInterractiveMode(_toDirTwice, _toDir, inputs); + + // start broker + startBroker(); + + // Create a connection and start it + Connection connection = getConnection(); + connection.start(); + + // consume a message for non-existing store + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(nonExistingQueueName); + MessageConsumer messageConsumer = session.createConsumer(queue); + Message message = messageConsumer.receive(1000); + + // assert consumed message + assertNotNull("Message was not migrated!", message); + assertTrue("Unexpected message received!", message instanceof TextMessage); + String text = ((TextMessage) message).getText(); + assertEquals("Message migration failed!", messageText, text); + } + + /** + * An utility method to upgrade broker with simulation user interactions + * + * @param fromDir + * location of the store to migrate + * @param toDir + * location of where migrated data will be stored + * @param inputs + * user answers on upgrade tool questions + * @throws Exception + */ + private void upgradeBrokerStoreInInterractiveMode(String fromDir, String toDir, final String[] inputs) + throws Exception + { + // save to restore system.in after data migration + InputStream stdin = System.in; + + // set fake system in to simulate user interactions + // FIXME: it is a quite dirty simulator of system input but it does the job + System.setIn(new InputStream() + { + + int counter = 0; + + public synchronized int read(byte b[], int off, int len) + { + byte[] src = (inputs[counter] + "\n").getBytes(); + System.arraycopy(src, 0, b, off, src.length); + counter++; + return src.length; + } + + @Override + public int read() throws IOException + { + return -1; + } + }); + + try + { + // Upgrade the test store. + new BDBStoreUpgrade(fromDir, toDir, null, true, true).upgradeFromVersion(VERSION_4); + } + finally + { + // restore system in + System.setIn(stdin); + } + } + + @SuppressWarnings("unchecked") + private void addMessageForNonExistingQueue(File storeLocation, int storeVersion, String nonExistingQueueName, + String messageText) throws Exception + { + final AMQShortString queueName = new AMQShortString(nonExistingQueueName); + BDBMessageStore store = new BDBMessageStore(storeVersion); + store.configure(storeLocation, false); + try + { + store.start(); + + // store message objects + ByteBuffer completeContentBody = ByteBuffer.wrap(messageText.getBytes("UTF-8")); + long bodySize = completeContentBody.limit(); + MessagePublishInfo pubInfoBody = new MessagePublishInfoImpl(new AMQShortString("amq.direct"), false, + false, queueName); + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue()); + props.setContentType("text/plain"); + props.setType("text/plain"); + props.setMessageId("whatever"); + props.setEncoding("UTF-8"); + props.getHeaders().setString("Test", "MST"); + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); + int classForBasic = methodRegistry.createBasicQosOkBody().getClazz(); + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, bodySize); + + // add content entry to database + long messageId = store.getNewMessageId(); + TupleBinding contentKeyTB = new MessageContentKeyTupleBindingFactory(storeVersion).getInstance(); + MessageContentKey contentKey = null; + if (storeVersion == VERSION_4) + { + contentKey = new MessageContentKey_4(messageId, 0); + } + else + { + throw new Exception(storeVersion + " is not supported"); + } + DatabaseEntry key = new DatabaseEntry(); + contentKeyTB.objectToEntry(contentKey, key); + DatabaseEntry data = new DatabaseEntry(); + ContentTB contentTB = new ContentTB(); + contentTB.objectToEntry(completeContentBody, data); + store.getContentDb().put(null, key, data); + + // add meta data entry to database + TupleBinding longTB = TupleBinding.getPrimitiveBinding(Long.class); + TupleBinding metaDataTB = new MessageMetaDataTupleBindingFactory(storeVersion).getInstance(); + key = new DatabaseEntry(); + data = new DatabaseEntry(); + longTB.objectToEntry(new Long(messageId), key); + MessageMetaData metaData = new MessageMetaData(pubInfoBody, contentHeaderBody, 1); + metaDataTB.objectToEntry(metaData, data); + store.getMetaDataDb().put(null, key, data); + + // add delivery entry to database + TransactionLogResource mockQueue = new TransactionLogResource() + { + public String getResourceName() + { + return queueName.asString(); + } + }; + TransactionLog log = (TransactionLog) store; + TransactionLog.Transaction txn = log.newTransaction(); + txn.enqueueMessage(mockQueue, messageId); + txn.commitTran(); + } + finally + { + // close store + store.close(); + } + } + + private void consumeDurableSubscriptionMessages(Connection connection) throws Exception + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(TOPIC_NAME); + + TopicSubscriber durSub = session.createDurableSubscriber(topic, SUB_NAME,"testprop='true'", false); + + // Retrieve the matching message + Message m = durSub.receive(2000); + assertNotNull("Failed to receive an expected message", m); + assertEquals("Selector property did not match", "true", m.getStringProperty("testprop")); + assertEquals("ID property did not match", 1, m.getIntProperty("ID")); + assertEquals("Message content was not as expected",BDBStoreUpgradeTestPreparer.generateString(1024) , ((TextMessage)m).getText()); + + // Verify that neither the non-matching or uncommitted message are received + m = durSub.receive(1000); + assertNull("No more messages should have been recieved", m); + + durSub.close(); + session.close(); + } + + private void consumeQueueMessages(Connection connection, boolean extraMessage) throws Exception + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(QUEUE_NAME); + + MessageConsumer consumer = session.createConsumer(queue); + Message m; + + // Retrieve the initial pre-upgrade messages + for (int i=1; i <= 5 ; i++) + { + m = consumer.receive(2000); + assertNotNull("Failed to receive an expected message", m); + assertEquals("ID property did not match", i, m.getIntProperty("ID")); + assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText()); + } + for (int i=1; i <= 5 ; i++) + { + m = consumer.receive(2000); + assertNotNull("Failed to receive an expected message", m); + assertEquals("ID property did not match", i, m.getIntProperty("ID")); + assertEquals("Message content was not as expected", STRING_1024, ((TextMessage)m).getText()); + } + + if(extraMessage) + { + //verify that the extra message is received + m = consumer.receive(2000); + assertNotNull("Failed to receive an expected message", m); + assertEquals("ID property did not match", 1, m.getIntProperty("ID")); + assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText()); + } + + // Verify that no more messages are received + m = consumer.receive(1000); + assertNull("No more messages should have been recieved", m); + + consumer.close(); + session.close(); + } + + private void upgradeBrokerStore(String fromDir, String toDir) throws Exception + { + new BDBStoreUpgrade(_fromDir, _toDir, null, false, true).upgradeFromVersion(VERSION_4); + } +} Added: qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb?rev=1175235&view=auto ============================================================================== Files qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb (added) and qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb Sat Sep 24 20:16:00 2011 differ --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org