Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1548D200C57 for ; Tue, 7 Mar 2017 20:38:50 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 13B16160B74; Tue, 7 Mar 2017 19:38:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4338C160B8A for ; Tue, 7 Mar 2017 20:38:47 +0100 (CET) Received: (qmail 71954 invoked by uid 500); 7 Mar 2017 19:38:43 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 70631 invoked by uid 99); 7 Mar 2017 19:38:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Mar 2017 19:38:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5F64ADFC4A; Tue, 7 Mar 2017 19:38:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: tabish@apache.org To: commits@activemq.apache.org Date: Tue, 07 Mar 2017 19:39:24 -0000 Message-Id: In-Reply-To: <9671c43771644fdcbe5a90712858608b@git.apache.org> References: <9671c43771644fdcbe5a90712858608b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [44/50] [abbrv] activemq-nms-msmq git commit: Apply patch for AMQNET-556 from Stephane Ramet. Thanks Stephane! archived-at: Tue, 07 Mar 2017 19:38:50 -0000 http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/main/csharp/Selector/Token.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Selector/Token.cs b/src/main/csharp/Selector/Token.cs index bc5b705..49bc4ea 100644 --- a/src/main/csharp/Selector/Token.cs +++ b/src/main/csharp/Selector/Token.cs @@ -1,78 +1,78 @@ -/* Generated By:CSharpCC: Do not edit this line. Token.cs Version 3.0 */ -/// -/// Describes the input token stream. -/// - -public class Token { - - /// - /// Gets an integer that describes the kind of this token. - /// - /// - /// This numbering system is determined by CSharpCCParser, and - /// a table of these numbers is stored in the class . - /// - public int kind; - - /** - * beginLine and beginColumn describe the position of the first character - * of this token; endLine and endColumn describe the position of the - * last character of this token. - */ - public int beginLine, beginColumn, endLine, endColumn; - - /** - * The string image of the token. - */ - public string image; - - /** - * A reference to the next regular (non-special) token from the input - * stream. If this is the last token from the input stream, or if the - * token manager has not read tokens beyond this one, this field is - * set to null. This is true only if this token is also a regular - * token. Otherwise, see below for a description of the contents of - * this field. - */ - public Token next; - - /** - * This field is used to access special tokens that occur prior to this - * token, but after the immediately preceding regular (non-special) token. - * If there are no such special tokens, this field is set to null. - * When there are more than one such special token, this field refers - * to the last of these special tokens, which in turn refers to the next - * previous special token through its specialToken field, and so on - * until the first special token (whose specialToken field is null). - * The next fields of special tokens refer to other special tokens that - * immediately follow it (without an intervening regular token). If there - * is no such token, this field is null. - */ - public Token specialToken; - - /** - * Returns the image. - */ - public override string ToString() { - return image; - } - - /** - * Returns a new Token object, by default. However, if you want, you - * can create and return subclass objects based on the value of ofKind. - * Simply add the cases to the switch for all those special cases. - * For example, if you have a subclass of Token called IDToken that - * you want to create if ofKind is ID, simlpy add something like : - * - * case MyParserConstants.ID : return new IDToken(); - * - * to the following switch statement. Then you can cast matchedToken - * variable to the appropriate type and use it in your lexical actions. - */ - public static Token NewToken(int ofKind) { - switch(ofKind) { - default : return new Token(); - } - } - -} +/* Generated By:CSharpCC: Do not edit this line. Token.cs Version 3.0 */ +/// +/// Describes the input token stream. +/// + +public class Token { + + /// + /// Gets an integer that describes the kind of this token. + /// + /// + /// This numbering system is determined by CSharpCCParser, and + /// a table of these numbers is stored in the class . + /// + public int kind; + + /** + * beginLine and beginColumn describe the position of the first character + * of this token; endLine and endColumn describe the position of the + * last character of this token. + */ + public int beginLine, beginColumn, endLine, endColumn; + + /** + * The string image of the token. + */ + public string image; + + /** + * A reference to the next regular (non-special) token from the input + * stream. If this is the last token from the input stream, or if the + * token manager has not read tokens beyond this one, this field is + * set to null. This is true only if this token is also a regular + * token. Otherwise, see below for a description of the contents of + * this field. + */ + public Token next; + + /** + * This field is used to access special tokens that occur prior to this + * token, but after the immediately preceding regular (non-special) token. + * If there are no such special tokens, this field is set to null. + * When there are more than one such special token, this field refers + * to the last of these special tokens, which in turn refers to the next + * previous special token through its specialToken field, and so on + * until the first special token (whose specialToken field is null). + * The next fields of special tokens refer to other special tokens that + * immediately follow it (without an intervening regular token). If there + * is no such token, this field is null. + */ + public Token specialToken; + + /** + * Returns the image. + */ + public override string ToString() { + return image; + } + + /** + * Returns a new Token object, by default. However, if you want, you + * can create and return subclass objects based on the value of ofKind. + * Simply add the cases to the switch for all those special cases. + * For example, if you have a subclass of Token called IDToken that + * you want to create if ofKind is ID, simlpy add something like : + * + * case MyParserConstants.ID : return new IDToken(); + * + * to the following switch statement. Then you can cast matchedToken + * variable to the appropriate type and use it in your lexical actions. + */ + public static Token NewToken(int ofKind) { + switch(ofKind) { + default : return new Token(); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/main/csharp/Selector/TokenMgrError.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Selector/TokenMgrError.cs b/src/main/csharp/Selector/TokenMgrError.cs index b011703..f525134 100644 --- a/src/main/csharp/Selector/TokenMgrError.cs +++ b/src/main/csharp/Selector/TokenMgrError.cs @@ -1,130 +1,130 @@ -/* Generated By:CSharpCC: Do not edit this line. TokenMgrError.cs Version 3.0 */ -public class TokenMgrError : System.SystemException -{ - /* - * Ordinals for various reasons why an Exceptions of this type can be thrown. - */ - - /** - * Lexical error occured. - */ - internal static readonly int LexicalError = 0; - - /** - * An attempt wass made to create a second instance of a static token manager. - */ - internal static readonly int StaticLexerError = 1; - - /** - * Tried to change to an invalid lexical state. - */ - internal static readonly int InvalidLexicalState = 2; - - /** - * Detected (and bailed out of) an infinite loop in the token manager. - */ - internal static readonly int LoopDetected = 3; - - /** - * Indicates the reason why the exception is thrown. It will have - * one of the above 4 values. - */ - int errorCode; - - /** - * Replaces unprintable characters by their espaced (or unicode escaped) - * equivalents in the given string - */ - protected static string AddEscapes(string str) { - System.Text.StringBuilder retval = new System.Text.StringBuilder(); - char ch; - for (int i = 0; i < str.Length; i++) { - switch (str[i]) { - case '\0' : - continue; - case '\b': - retval.Append("\\b"); - continue; - case '\t': - retval.Append("\\t"); - continue; - case '\n': - retval.Append("\\n"); - continue; - case '\f': - retval.Append("\\f"); - continue; - case '\r': - retval.Append("\\r"); - continue; - case '\"': - retval.Append("\\\""); - continue; - case '\'': - retval.Append("\\\'"); - continue; - case '\\': - retval.Append("\\\\"); - continue; - default: - if ((ch = str[i]) < 0x20 || ch > 0x7e) { - string s = "0000" + System.Convert.ToString((int)ch, 16); - retval.Append("\\u" + s.Substring(s.Length - 4, s.Length - (s.Length - 4))); - } else { - retval.Append(ch); - } - continue; - } - } - return retval.ToString(); - } - - /** - * Returns a detailed message for the Exception when it is thrown by the - * token manager to indicate a lexical error. - * Parameters : - * EOFSeen : indicates if EOF caused the lexicl error - * curLexState : lexical state in which this error occured - * errorLine : line number when the error occured - * errorColumn : column number when the error occured - * errorAfter : prefix that was seen before this error occured - * curchar : the offending character - * Note: You can customize the lexical error message by modifying this method. - */ - protected static string GetLexicalError(bool EOFSeen, int lexState, int errorLine, int errorColumn, string errorAfter, char curChar) { - return("Lexical error at line " + - errorLine + ", column " + - errorColumn + ". Encountered: " + - (EOFSeen ? " " : ("\"" + AddEscapes(curChar.ToString()) + "\"") + " (" + (int)curChar + "), ") + - "after : \"" + AddEscapes(errorAfter) + "\""); - } - - /** - * You can also modify the body of this method to customize your error messages. - * For example, cases like LOOP_DETECTED and INVALID_LEXICAL_STATE are not - * of end-users concern, so you can return something like : - * - * "Internal Error : Please file a bug report .... " - * - * from this method for such cases in the release version of your parser. - */ - public override string Message { - get { return base.Message; } - } - - /* - * Constructors of various flavors follow. - */ - - public TokenMgrError() { - } - - public TokenMgrError(string message, int reason) : - base(message) { - errorCode = reason; - } - - public TokenMgrError(bool EOFSeen, int lexState, int errorLine, int errorColumn, string errorAfter, char curChar, int reason) : - this(GetLexicalError(EOFSeen, lexState, errorLine, errorColumn, errorAfter, curChar), reason) { - } -} +/* Generated By:CSharpCC: Do not edit this line. TokenMgrError.cs Version 3.0 */ +public class TokenMgrError : System.SystemException +{ + /* + * Ordinals for various reasons why an Exceptions of this type can be thrown. + */ + + /* + * Lexical error occured. + */ + internal static readonly int LexicalError = 0; + + /* + * An attempt wass made to create a second instance of a static token manager. + */ + internal static readonly int StaticLexerError = 1; + + /* + * Tried to change to an invalid lexical state. + */ + internal static readonly int InvalidLexicalState = 2; + + /* + * Detected (and bailed out of) an infinite loop in the token manager. + */ + internal static readonly int LoopDetected = 3; + + /* + * Indicates the reason why the exception is thrown. It will have + * one of the above 4 values. + */ + int errorCode; + + /* + * Replaces unprintable characters by their espaced (or unicode escaped) + * equivalents in the given string + */ + protected static string AddEscapes(string str) { + System.Text.StringBuilder retval = new System.Text.StringBuilder(); + char ch; + for (int i = 0; i < str.Length; i++) { + switch (str[i]) { + case '\0' : + continue; + case '\b': + retval.Append("\\b"); + continue; + case '\t': + retval.Append("\\t"); + continue; + case '\n': + retval.Append("\\n"); + continue; + case '\f': + retval.Append("\\f"); + continue; + case '\r': + retval.Append("\\r"); + continue; + case '\"': + retval.Append("\\\""); + continue; + case '\'': + retval.Append("\\\'"); + continue; + case '\\': + retval.Append("\\\\"); + continue; + default: + if ((ch = str[i]) < 0x20 || ch > 0x7e) { + string s = "0000" + System.Convert.ToString((int)ch, 16); + retval.Append("\\u" + s.Substring(s.Length - 4, s.Length - (s.Length - 4))); + } else { + retval.Append(ch); + } + continue; + } + } + return retval.ToString(); + } + + /* + * Returns a detailed message for the Exception when it is thrown by the + * token manager to indicate a lexical error. + * Parameters : + * EOFSeen : indicates if EOF caused the lexicl error + * curLexState : lexical state in which this error occured + * errorLine : line number when the error occured + * errorColumn : column number when the error occured + * errorAfter : prefix that was seen before this error occured + * curchar : the offending character + * Note: You can customize the lexical error message by modifying this method. + */ + protected static string GetLexicalError(bool EOFSeen, int lexState, int errorLine, int errorColumn, string errorAfter, char curChar) { + return("Lexical error at line " + + errorLine + ", column " + + errorColumn + ". Encountered: " + + (EOFSeen ? " " : ("\"" + AddEscapes(curChar.ToString()) + "\"") + " (" + (int)curChar + "), ") + + "after : \"" + AddEscapes(errorAfter) + "\""); + } + + /* + * You can also modify the body of this method to customize your error messages. + * For example, cases like LOOP_DETECTED and INVALID_LEXICAL_STATE are not + * of end-users concern, so you can return something like : + * + * "Internal Error : Please file a bug report .... " + * + * from this method for such cases in the release version of your parser. + */ + public override string Message { + get { return base.Message; } + } + + /* + * Constructors of various flavors follow. + */ + + public TokenMgrError() { + } + + public TokenMgrError(string message, int reason) : + base(message) { + errorCode = reason; + } + + public TokenMgrError(bool EOFSeen, int lexState, int errorLine, int errorColumn, string errorAfter, char curChar, int reason) : + this(GetLexicalError(EOFSeen, lexState, errorLine, errorColumn, errorAfter, curChar), reason) { + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/main/csharp/Selector/UnaryExpression.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Selector/UnaryExpression.cs b/src/main/csharp/Selector/UnaryExpression.cs index 4ccbbc0..14d0262 100644 --- a/src/main/csharp/Selector/UnaryExpression.cs +++ b/src/main/csharp/Selector/UnaryExpression.cs @@ -1,66 +1,65 @@ -using System; -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.NMS.Selector -{ - /// - /// An expression which performs an operation on one expression value. - /// - public abstract class UnaryExpression : IExpression - { - protected IExpression rightExpression; - public IExpression Right - { - get { return rightExpression; } - set { rightExpression = value; } - } - - protected abstract string ExpressionSymbol - { - get; - } - - public UnaryExpression(IExpression left) - { - this.rightExpression = left; - } - - public abstract object Evaluate(MessageEvaluationContext message); - - public override string ToString() - { - return "(" + ExpressionSymbol + " " + rightExpression.ToString() + ")"; - } - - public static IExpression CreateNegate(IExpression left) - { - return new NegateExpression(left); - } - - public static IBooleanExpression CreateNOT(IBooleanExpression left) - { - return new NOTExpression(left); - } - - public static IBooleanExpression CreateBooleanCast(IExpression left) - { - return new BooleanCastExpression(left); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; + +namespace Apache.NMS.Selector +{ + /// + /// An expression which performs an operation on one expression value. + /// + public abstract class UnaryExpression : IExpression + { + protected IExpression rightExpression; + public IExpression Right + { + get { return rightExpression; } + set { rightExpression = value; } + } + + protected abstract string ExpressionSymbol + { + get; + } + + public UnaryExpression(IExpression left) + { + this.rightExpression = left; + } + + public abstract object Evaluate(MessageEvaluationContext message); + + public override string ToString() + { + return "(" + ExpressionSymbol + " " + rightExpression.ToString() + ")"; + } + + public static IExpression CreateNegate(IExpression left) + { + return new NegateExpression(left); + } + + public static IBooleanExpression CreateNOT(IBooleanExpression left) + { + return new NOTExpression(left); + } + + public static IBooleanExpression CreateBooleanCast(IExpression left) + { + return new BooleanCastExpression(left); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/main/csharp/Session.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Session.cs b/src/main/csharp/Session.cs index b64c6ae..531dce2 100644 --- a/src/main/csharp/Session.cs +++ b/src/main/csharp/Session.cs @@ -70,6 +70,12 @@ namespace Apache.NMS.MSMQ public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal) { + // Bad consumer test + if(destination == null) + { + throw new NMSException("Consumer destination cannot be null"); + } + MessageQueue queue = MessageConverter.ToMsmqDestination(destination); return new MessageConsumer(this, acknowledgementMode, queue, selector); } @@ -178,6 +184,9 @@ namespace Apache.NMS.MSMQ throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode); } messageQueueTransaction.Commit(); + + // Start a new transaction + MessageQueueTransaction = new MessageQueueTransaction(); } public void Rollback() @@ -187,6 +196,9 @@ namespace Apache.NMS.MSMQ throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode); } messageQueueTransaction.Abort(); + + // Start a new transaction + MessageQueueTransaction = new MessageQueueTransaction(); } public void Recover() @@ -221,17 +233,17 @@ namespace Apache.NMS.MSMQ public MessageQueueTransaction MessageQueueTransaction { - get + get { return messageQueueTransaction; } + set { + messageQueueTransaction = value; + if(null != messageQueueTransaction && messageQueueTransaction.Status != MessageQueueTransactionStatus.Pending) { messageQueueTransaction.Begin(); } - - return messageQueueTransaction; } - set { messageQueueTransaction = value; } } public IMessageConverter MessageConverter http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/test/csharp/AsyncConsumeTest.cs ---------------------------------------------------------------------- diff --git a/src/test/csharp/AsyncConsumeTest.cs b/src/test/csharp/AsyncConsumeTest.cs new file mode 100644 index 0000000..4d38b1a --- /dev/null +++ b/src/test/csharp/AsyncConsumeTest.cs @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.Threading; +using NUnit.Framework; + +namespace Apache.NMS.Test +{ + //[TestFixture] + public class AsyncConsumeTest : NMSTest + { + protected string RESPONSE_CLIENT_ID; + protected AutoResetEvent semaphore; + protected bool received; + protected IMessage receivedMsg; + + public AsyncConsumeTest(NMSTestSupport testSupport) + : base(testSupport) + { + } + + //[SetUp] + public override void SetUp() + { + base.SetUp(); + semaphore = new AutoResetEvent(false); + received = false; + receivedMsg = null; + + RESPONSE_CLIENT_ID = GetTestClientId() + ":RESPONSE"; + } + + //[TearDown] + public override void TearDown() + { + receivedMsg = null; + base.TearDown(); + } + + //[Test] + public virtual void TestAsynchronousConsume( + //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)] + MsgDeliveryMode deliveryMode, string testDestRef) + { + // IBM WMQ doesn't support both synchronous and asynchronous operations + // in the same session. Needs 2 separate sessions. + using(IConnection connection = CreateConnectionAndStart(GetTestClientId())) + using(ISession producerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + using(ISession consumerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + using(IDestination producerDestination = GetClearDestinationByNodeReference(producerSession, testDestRef)) + using(IDestination consumerDestination = GetClearDestinationByNodeReference(consumerSession, testDestRef)) + using(IMessageConsumer consumer = consumerSession.CreateConsumer(consumerDestination)) + using(IMessageProducer producer = producerSession.CreateProducer(producerDestination)) + { + producer.DeliveryMode = deliveryMode; + consumer.Listener += new MessageListener(OnMessage); + + IMessage request = producerSession.CreateMessage(); + request.NMSCorrelationID = "AsyncConsume"; + request.NMSType = "Test"; + producer.Send(request); + + WaitForMessageToArrive(); + Assert.AreEqual(request.NMSCorrelationID, receivedMsg.NMSCorrelationID, "Invalid correlation ID."); + } + } + + //[Test] + public virtual void TestCreateConsumerAfterSend( + //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)] + MsgDeliveryMode deliveryMode, string testDestRef) + { + using(IConnection connection = CreateConnectionAndStart(GetTestClientId())) + using(ISession producerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + using(ISession consumerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + using(IDestination producerDestination = GetClearDestinationByNodeReference(producerSession, testDestRef)) + using(IDestination consumerDestination = GetDestinationByNodeReference(consumerSession, testDestRef)) + { + string correlationId = "AsyncConsumeAfterSend"; + + using(IMessageProducer producer = producerSession.CreateProducer(producerDestination)) + { + producer.DeliveryMode = deliveryMode; + IMessage request = producerSession.CreateMessage(); + request.NMSCorrelationID = correlationId; + request.NMSType = "Test"; + producer.Send(request); + } + + using(IMessageConsumer consumer = consumerSession.CreateConsumer(consumerDestination)) + { + consumer.Listener += new MessageListener(OnMessage); + WaitForMessageToArrive(); + Assert.AreEqual(correlationId, receivedMsg.NMSCorrelationID, "Invalid correlation ID."); + } + } + } + + //[Test] + public virtual void TestCreateConsumerBeforeSendAddListenerAfterSend( + //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)] + MsgDeliveryMode deliveryMode, string testDestRef) + { + using(IConnection connection = CreateConnectionAndStart(GetTestClientId())) + using(ISession producerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + using(ISession consumerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + using(IDestination producerDestination = GetClearDestinationByNodeReference(producerSession, testDestRef)) + using(IDestination consumerDestination = GetDestinationByNodeReference(consumerSession, testDestRef)) + using(IMessageConsumer consumer = consumerSession.CreateConsumer(consumerDestination)) + using(IMessageProducer producer = producerSession.CreateProducer(producerDestination)) + { + producer.DeliveryMode = deliveryMode; + + IMessage request = producerSession.CreateMessage(); + request.NMSCorrelationID = "AsyncConsumeAfterSendLateListener"; + request.NMSType = "Test"; + producer.Send(request); + + // now lets add the listener + consumer.Listener += new MessageListener(OnMessage); + WaitForMessageToArrive(); + Assert.AreEqual(request.NMSCorrelationID, receivedMsg.NMSCorrelationID, "Invalid correlation ID."); + } + } + + //[Test] + public virtual void TestAsynchronousTextMessageConsume( + //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)] + MsgDeliveryMode deliveryMode, string testDestRef) + { + using(IConnection connection = CreateConnectionAndStart(GetTestClientId())) + using(ISession producerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + using(ISession consumerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + using(IDestination producerDestination = GetClearDestinationByNodeReference(producerSession, testDestRef)) + using(IDestination consumerDestination = GetDestinationByNodeReference(consumerSession, testDestRef)) + using(IMessageConsumer consumer = consumerSession.CreateConsumer(consumerDestination)) + using(IMessageProducer producer = producerSession.CreateProducer(producerDestination)) + { + consumer.Listener += new MessageListener(OnMessage); + producer.DeliveryMode = deliveryMode; + + ITextMessage request = producerSession.CreateTextMessage("Hello, World!"); + request.NMSCorrelationID = "AsyncConsumeTextMessage"; + request.Properties["NMSXGroupID"] = "cheese"; + request.Properties["myHeader"] = "James"; + + producer.Send(request); + + WaitForMessageToArrive(); + Assert.AreEqual(request.NMSCorrelationID, receivedMsg.NMSCorrelationID, "Invalid correlation ID."); + Assert.AreEqual(request.Properties["NMSXGroupID"], receivedMsg.Properties["NMSXGroupID"], "Invalid NMSXGroupID."); + Assert.AreEqual(request.Properties["myHeader"], receivedMsg.Properties["myHeader"], "Invalid myHeader."); + Assert.AreEqual(request.Text, ((ITextMessage) receivedMsg).Text, "Invalid text body."); + } + } + + //[Test] + public virtual void TestTemporaryQueueAsynchronousConsume( + //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)] + MsgDeliveryMode deliveryMode, string testDestRef) + { + using(IConnection connection = CreateConnectionAndStart(GetTestClientId())) + using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + using(ITemporaryQueue tempReplyDestination = session.CreateTemporaryQueue()) + using(IDestination destination = GetClearDestinationByNodeReference(session, testDestRef)) + using(IMessageConsumer consumer = session.CreateConsumer(destination)) + using(IMessageConsumer tempConsumer = session.CreateConsumer(tempReplyDestination)) + using(IMessageProducer producer = session.CreateProducer(destination)) + { + producer.DeliveryMode = deliveryMode; + tempConsumer.Listener += new MessageListener(OnMessage); + consumer.Listener += new MessageListener(OnQueueMessage); + + IMessage request = session.CreateMessage(); + request.NMSCorrelationID = "TemqQueueAsyncConsume"; + request.NMSType = "Test"; + request.NMSReplyTo = tempReplyDestination; + producer.Send(request); + + WaitForMessageToArrive(); + Assert.AreEqual("TempQueueAsyncResponse", receivedMsg.NMSCorrelationID, "Invalid correlation ID."); + } + } + + protected void OnQueueMessage(IMessage message) + { + Assert.AreEqual("TemqQueueAsyncConsume", message.NMSCorrelationID, "Invalid correlation ID."); + using(IConnection connection = CreateConnectionAndStart(RESPONSE_CLIENT_ID)) + using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + using(IMessageProducer producer = session.CreateProducer(message.NMSReplyTo)) + { + producer.DeliveryMode = message.NMSDeliveryMode; + + ITextMessage response = session.CreateTextMessage("Asynchronous Response Message Text"); + response.NMSCorrelationID = "TempQueueAsyncResponse"; + response.NMSType = message.NMSType; + producer.Send(response); + } + } + + protected void OnMessage(IMessage message) + { + receivedMsg = message; + received = true; + semaphore.Set(); + } + + protected void WaitForMessageToArrive() + { + semaphore.WaitOne((int) receiveTimeout.TotalMilliseconds, true); + Assert.IsTrue(received, "Should have received a message by now!"); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/test/csharp/BadConsumeTest.cs ---------------------------------------------------------------------- diff --git a/src/test/csharp/BadConsumeTest.cs b/src/test/csharp/BadConsumeTest.cs new file mode 100644 index 0000000..76f7cfc --- /dev/null +++ b/src/test/csharp/BadConsumeTest.cs @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; +using NUnit.Framework; + +namespace Apache.NMS.Test +{ + //[TestFixture] + public class BadConsumeTest : NMSTest + { + protected IConnection connection; + protected ISession session; + + protected BadConsumeTest(NMSTestSupport testSupport) + : base(testSupport) + { + } + + //[SetUp] + public override void SetUp() + { + connection = CreateConnection(GetTestClientId()); + connection.Start(); + session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge); + } + + //[TearDown] + public override void TearDown() + { + if(null != session) + { + session.Dispose(); + session = null; + } + + if(null != connection) + { + connection.Dispose(); + connection = null; + } + } + + //[Test] + //[ExpectedException(Handler="ExceptionValidationCheck")] + public virtual void TestBadConsumerException() + { + session.CreateConsumer(null); + } + + public void ExceptionValidationCheck(Exception ex) + { + Assert.IsNotNull(ex as NMSException, "Invalid exception was thrown."); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/test/csharp/BytesMessageTest.cs ---------------------------------------------------------------------- diff --git a/src/test/csharp/BytesMessageTest.cs b/src/test/csharp/BytesMessageTest.cs new file mode 100644 index 0000000..8e6468f --- /dev/null +++ b/src/test/csharp/BytesMessageTest.cs @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using Apache.NMS.Util; +using NUnit.Framework; + +namespace Apache.NMS.Test +{ + //[TestFixture] + public class BytesMessageTest : NMSTest + { + protected byte[] msgContent = {1, 2, 3, 4, 5, 6, 7, 8}; + + protected BytesMessageTest(NMSTestSupport testSupport) + : base(testSupport) + { + } + + //[Test] + public virtual void SendReceiveBytesMessage( + //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)] + MsgDeliveryMode deliveryMode, string testDestRef) + { + using(IConnection connection = CreateConnection(GetTestClientId())) + { + connection.Start(); + using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + { + IDestination destination = GetClearDestinationByNodeReference(session, testDestRef); + using(IMessageConsumer consumer = session.CreateConsumer(destination)) + using(IMessageProducer producer = session.CreateProducer(destination)) + { + producer.DeliveryMode = deliveryMode; + IMessage request = session.CreateBytesMessage(msgContent); + producer.Send(request); + + IMessage message = consumer.Receive(receiveTimeout); + AssertMessageIsReadOnly(message); + AssertBytesMessageEqual(request, message); + Assert.AreEqual(deliveryMode, message.NMSDeliveryMode, "NMSDeliveryMode does not match"); + + } + } + } + } + + //[Test] + public virtual void SendReceiveBytesMessageContent( + //[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)] + MsgDeliveryMode deliveryMode, string testDestRef) + { + using(IConnection connection = CreateConnection(GetTestClientId())) + { + connection.Start(); + using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + { + IDestination destination = GetClearDestinationByNodeReference(session, testDestRef); + using(IMessageConsumer consumer = session.CreateConsumer(destination)) + using(IMessageProducer producer = session.CreateProducer(destination)) + { + producer.DeliveryMode = deliveryMode; + IBytesMessage request = session.CreateBytesMessage(); + + request.WriteBoolean(true); + request.WriteByte((byte) 1); + request.WriteBytes(new byte[1]); + request.WriteBytes(new byte[3], 0, 2); + request.WriteChar('a'); + request.WriteDouble(1.5); + request.WriteSingle((float) 1.5); + request.WriteInt32(1); + request.WriteInt64(1); + request.WriteObject("stringobj"); + request.WriteInt16((short) 1); + request.WriteString("utfstring"); + + producer.Send(request); + + IMessage message = consumer.Receive(receiveTimeout); + AssertMessageIsReadOnly(message); + AssertBytesMessageEqual(request, message); + Assert.AreEqual(deliveryMode, message.NMSDeliveryMode, "NMSDeliveryMode does not match"); + + } + } + } + } + + protected void AssertMessageIsReadOnly(IMessage message) + { + Type writeableExceptionType = typeof(MessageNotWriteableException); + IBytesMessage theMessage = message as IBytesMessage; + Assert.IsNotNull(theMessage); + Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteBoolean(true); }); + Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteByte((byte) 1); }); + Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteBytes(new byte[1]); }); + Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteBytes(new byte[3], 0, 2); }); + Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteChar('a'); }); + Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteDouble(1.5); }); + Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteSingle((float) 1.5); }); + Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteInt32(1); }); + Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteInt64(1); }); + Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteObject("stringobj"); }); + Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteInt16((short) 1); }); + Assert.Throws(writeableExceptionType, delegate () { theMessage.WriteString("utfstring"); }); + } + + /// + /// Assert that two messages are IBytesMessages and their contents are equal. + /// + /// + /// + protected void AssertBytesMessageEqual(IMessage expected, IMessage actual) + { + IBytesMessage expectedBytesMsg = expected as IBytesMessage; + expectedBytesMsg.Reset(); + Assert.IsNotNull(expectedBytesMsg, "'expected' message not a bytes message"); + IBytesMessage actualBytesMsg = actual as IBytesMessage; + Assert.IsNotNull(actualBytesMsg, "'actual' message not a bytes message"); + Assert.AreEqual(expectedBytesMsg.Content, actualBytesMsg.Content, "Bytes message contents do not match."); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/test/csharp/Commands/BytesMessage.cs ---------------------------------------------------------------------- diff --git a/src/test/csharp/Commands/BytesMessage.cs b/src/test/csharp/Commands/BytesMessage.cs new file mode 100644 index 0000000..02fb23d --- /dev/null +++ b/src/test/csharp/Commands/BytesMessage.cs @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using Apache.NMS; +using Apache.NMS.Util; +using System; +using System.Collections; +using System.IO; + +namespace Apache.NMS.Commands +{ + public class BytesMessage : Message, IBytesMessage + { + private EndianBinaryReader dataIn = null; + private EndianBinaryWriter dataOut = null; + private MemoryStream outputBuffer = null; + private int length = 0; + + public override Object Clone() + { + StoreContent(); + return base.Clone(); + } + + public override void ClearBody() + { + base.ClearBody(); + this.outputBuffer = null; + this.dataIn = null; + this.dataOut = null; + this.length = 0; + } + + public long BodyLength + { + get + { + InitializeReading(); + return this.length; + } + } + + public byte ReadByte() + { + InitializeReading(); + try + { + return dataIn.ReadByte(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteByte( byte value ) + { + InitializeWriting(); + try + { + dataOut.Write( value ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public bool ReadBoolean() + { + InitializeReading(); + try + { + return dataIn.ReadBoolean(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteBoolean( bool value ) + { + InitializeWriting(); + try + { + dataOut.Write( value ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public char ReadChar() + { + InitializeReading(); + try + { + return dataIn.ReadChar(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteChar( char value ) + { + InitializeWriting(); + try + { + dataOut.Write( value ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public short ReadInt16() + { + InitializeReading(); + try + { + return dataIn.ReadInt16(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteInt16( short value ) + { + InitializeWriting(); + try + { + dataOut.Write( value ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public int ReadInt32() + { + InitializeReading(); + try + { + return dataIn.ReadInt32(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteInt32( int value ) + { + InitializeWriting(); + try + { + dataOut.Write( value ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public long ReadInt64() + { + InitializeReading(); + try + { + return dataIn.ReadInt64(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteInt64( long value ) + { + InitializeWriting(); + try + { + dataOut.Write( value ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public float ReadSingle() + { + InitializeReading(); + try + { + return dataIn.ReadSingle(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteSingle( float value ) + { + InitializeWriting(); + try + { + dataOut.Write( value ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public double ReadDouble() + { + InitializeReading(); + try + { + return dataIn.ReadDouble(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteDouble( double value ) + { + InitializeWriting(); + try + { + dataOut.Write( value ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public int ReadBytes( byte[] value ) + { + InitializeReading(); + try + { + return dataIn.Read( value, 0, value.Length ); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public int ReadBytes( byte[] value, int length ) + { + InitializeReading(); + try + { + return dataIn.Read( value, 0, length ); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteBytes( byte[] value ) + { + InitializeWriting(); + try + { + dataOut.Write( value, 0, value.Length ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public void WriteBytes( byte[] value, int offset, int length ) + { + InitializeWriting(); + try + { + dataOut.Write( value, offset, length ); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public string ReadString() + { + InitializeReading(); + try + { + // JMS, CMS and NMS all encode the String using a 16 bit size header. + return dataIn.ReadString16(); + } + catch(EndOfStreamException e) + { + throw NMSExceptionSupport.CreateMessageEOFException(e); + } + catch(IOException e) + { + throw NMSExceptionSupport.CreateMessageFormatException(e); + } + } + + public void WriteString( string value ) + { + InitializeWriting(); + try + { + // JMS, CMS and NMS all encode the String using a 16 bit size header. + dataOut.WriteString16(value); + } + catch(Exception e) + { + throw NMSExceptionSupport.Create(e); + } + } + + public void WriteObject( System.Object value ) + { + InitializeWriting(); + if( value is System.Byte ) + { + this.dataOut.Write( (byte) value ); + } + else if( value is Char ) + { + this.dataOut.Write( (char) value ); + } + else if( value is Boolean ) + { + this.dataOut.Write( (bool) value ); + } + else if( value is Int16 ) + { + this.dataOut.Write( (short) value ); + } + else if( value is Int32 ) + { + this.dataOut.Write( (int) value ); + } + else if( value is Int64 ) + { + this.dataOut.Write( (long) value ); + } + else if( value is Single ) + { + this.dataOut.Write( (float) value ); + } + else if( value is Double ) + { + this.dataOut.Write( (double) value ); + } + else if( value is byte[] ) + { + this.dataOut.Write( (byte[]) value ); + } + else if( value is String ) + { + this.dataOut.WriteString16( (string) value ); + } + else + { + throw new MessageFormatException("Cannot write non-primitive type:" + value.GetType()); + } + } + + public new byte[] Content + { + get + { + byte[] buffer = null; + InitializeReading(); + if(this.length != 0) + { + buffer = new byte[this.length]; + this.dataIn.Read(buffer, 0, buffer.Length); + } + return buffer; + } + + set + { + InitializeWriting(); + this.dataOut.Write(value, 0, value.Length); + } + } + + public void Reset() + { + StoreContent(); + this.dataIn = null; + this.dataOut = null; + this.outputBuffer = null; + this.ReadOnlyBody = true; + } + + private void InitializeReading() + { + FailIfWriteOnlyBody(); + if(this.dataIn == null) + { + byte[] data = base.Content; + + if(base.Content == null) + { + data = new byte[0]; + } + + Stream target = new MemoryStream(data, false); + + this.length = data.Length; + this.dataIn = new EndianBinaryReader(target); + } + } + + private void InitializeWriting() + { + FailIfReadOnlyBody(); + if(this.dataOut == null) + { + this.outputBuffer = new MemoryStream(); + this.dataOut = new EndianBinaryWriter(this.outputBuffer); + } + } + + private void StoreContent() + { + if(this.dataOut != null) + { + this.dataOut.Close(); + base.Content = outputBuffer.ToArray(); + + this.dataOut = null; + this.outputBuffer = null; + } + } + + } +} + http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/test/csharp/Commands/Destination.cs ---------------------------------------------------------------------- diff --git a/src/test/csharp/Commands/Destination.cs b/src/test/csharp/Commands/Destination.cs new file mode 100644 index 0000000..58c7749 --- /dev/null +++ b/src/test/csharp/Commands/Destination.cs @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Specialized; +using Apache.NMS.Util; + +namespace Apache.NMS.Commands +{ + /// + /// Summary description for Destination. + /// + public abstract class Destination : IDestination, ICloneable + { + /// + /// Topic Destination object + /// + public const int TOPIC = 1; + /// + /// Temporary Topic Destination object + /// + public const int TEMPORARY_TOPIC = 2; + /// + /// Queue Destination object + /// + public const int QUEUE = 3; + /// + /// Temporary Queue Destination object + /// + public const int TEMPORARY_QUEUE = 4; + + private const String TEMP_PREFIX = "{TD{"; + private const String TEMP_POSTFIX = "}TD}"; + + private String physicalName = ""; + private StringDictionary options = null; + + private bool disposed = false; + + /// + /// The Default Constructor + /// + protected Destination() + { + } + + /// + /// Construct the Destination with a defined physical name; + /// + /// + protected Destination(String name) + { + setPhysicalName(name); + } + + ~Destination() + { + Dispose(false); + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + private void Dispose(bool disposing) + { + if(disposed) + { + return; + } + + if(disposing) + { + try + { + OnDispose(); + } + catch(Exception ex) + { + Tracer.ErrorFormat("Exception disposing Destination {0}: {1}", this.physicalName, ex.Message); + } + } + + disposed = true; + } + + /// + /// Child classes can override this method to perform clean-up logic. + /// + protected virtual void OnDispose() + { + } + + public bool IsTopic + { + get + { + int destinationType = GetDestinationType(); + return TOPIC == destinationType + || TEMPORARY_TOPIC == destinationType; + } + } + + public bool IsQueue + { + get + { + int destinationType = GetDestinationType(); + return QUEUE == destinationType + || TEMPORARY_QUEUE == destinationType; + } + } + + public bool IsTemporary + { + get + { + int destinationType = GetDestinationType(); + return TEMPORARY_QUEUE == destinationType + || TEMPORARY_TOPIC == destinationType; + } + } + + /// + /// Dictionary of name/value pairs representing option values specified + /// in the URI used to create this Destination. A null value is returned + /// if no options were specified. + /// + internal StringDictionary Options + { + get { return this.options; } + } + + private void setPhysicalName(string name) + { + this.physicalName = name; + + int p = name.IndexOf('?'); + if(p >= 0) + { + String optstring = physicalName.Substring(p + 1); + this.physicalName = name.Substring(0, p); + options = URISupport.ParseQuery(optstring); + } + } + + /// + /// + /// + /// + public static Destination Transform(IDestination destination) + { + Destination result = null; + if(destination != null) + { + if(destination is Destination) + { + result = (Destination) destination; + } + else + { + if(destination is ITemporaryQueue) + { + result = new TempQueue(((IQueue) destination).QueueName); + } + else if(destination is ITemporaryTopic) + { + result = new TempTopic(((ITopic) destination).TopicName); + } + else if(destination is IQueue) + { + result = new Queue(((IQueue) destination).QueueName); + } + else if(destination is ITopic) + { + result = new Topic(((ITopic) destination).TopicName); + } + } + } + return result; + } + + /// + /// Create a temporary name from the clientId + /// + /// + /// + public static String CreateTemporaryName(String clientId) + { + return TEMP_PREFIX + clientId + TEMP_POSTFIX; + } + + /// + /// From a temporary destination find the clientId of the Connection that created it + /// + /// + /// the clientId or null if not a temporary destination + public static String GetClientId(Destination destination) + { + String answer = null; + if(destination != null && destination.IsTemporary) + { + String name = destination.PhysicalName; + int start = name.IndexOf(TEMP_PREFIX); + if(start >= 0) + { + start += TEMP_PREFIX.Length; + int stop = name.LastIndexOf(TEMP_POSTFIX); + if(stop > start && stop < name.Length) + { + answer = name.Substring(start, stop); + } + } + } + return answer; + } + + /// + /// + /// object to compare + /// 1 if this is less than o else 0 if they are equal or -1 if this is less than o + public int CompareTo(Object o) + { + if(o is Destination) + { + return CompareTo((Destination) o); + } + return -1; + } + + /// + /// Lets sort by name first then lets sort topics greater than queues + /// + /// another destination to compare against + /// 1 if this is less than o else 0 if they are equal or -1 if this is less than o + public int CompareTo(Destination that) + { + int answer = 0; + if(physicalName != that.physicalName) + { + if(physicalName == null) + { + return -1; + } + else if(that.physicalName == null) + { + return 1; + } + answer = physicalName.CompareTo(that.physicalName); + } + + if(answer == 0) + { + if(IsTopic) + { + if(that.IsQueue) + { + return 1; + } + } + else + { + if(that.IsTopic) + { + return -1; + } + } + } + return answer; + } + + /// + /// + /// Returns the Destination type + public abstract int GetDestinationType(); + + public String PhysicalName + { + get { return this.physicalName; } + set + { + this.physicalName = value; + } + } + + /// + /// + /// string representation of this instance + public override String ToString() + { + switch(DestinationType) + { + case DestinationType.Topic: + return "topic://" + PhysicalName; + + case DestinationType.TemporaryTopic: + return "temp-topic://" + PhysicalName; + + case DestinationType.TemporaryQueue: + return "temp-queue://" + PhysicalName; + + default: + return "queue://" + PhysicalName; + } + } + + /// + /// + /// hashCode for this instance + public override int GetHashCode() + { + int answer = 37; + + if(this.physicalName != null) + { + answer = physicalName.GetHashCode(); + } + if(IsTopic) + { + answer ^= 0xfabfab; + } + return answer; + } + + /// + /// if the object passed in is equivalent, return true + /// + /// the object to compare + /// true if this instance and obj are equivalent + public override bool Equals(Object obj) + { + bool result = this == obj; + if(!result && obj != null && obj is Destination) + { + Destination other = (Destination) obj; + result = this.GetDestinationType() == other.GetDestinationType() + && this.physicalName.Equals(other.physicalName); + } + return result; + } + + /// + /// Factory method to create a child destination if this destination is a composite + /// + /// + /// the created Destination + public abstract Destination CreateDestination(String name); + + public abstract DestinationType DestinationType + { + get; + } + + public virtual Object Clone() + { + // Since we are the lowest level base class, do a + // shallow copy which will include the derived classes. + // From here we would do deep cloning of other objects + // if we had any. + return this.MemberwiseClone(); + } + } +} + http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/test/csharp/Commands/MapMessage.cs ---------------------------------------------------------------------- diff --git a/src/test/csharp/Commands/MapMessage.cs b/src/test/csharp/Commands/MapMessage.cs new file mode 100644 index 0000000..34de7aa --- /dev/null +++ b/src/test/csharp/Commands/MapMessage.cs @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.IO; +using Apache.NMS; +using Apache.NMS.Util; + +namespace Apache.NMS.Commands +{ + public class MapMessage : Message, IMapMessage + { + private PrimitiveMap body; + private PrimitiveMapInterceptor typeConverter; + + public MapMessage() : base() + { + } + + public MapMessage(PrimitiveMap body) : base() + { + this.body = body; + this.typeConverter = new PrimitiveMapInterceptor(this, this.body); + } + + public override void ClearBody() + { + this.body = null; + this.typeConverter = null; + base.ClearBody(); + } + + public override bool ReadOnlyBody + { + get { return base.ReadOnlyBody; } + + set + { + if(this.typeConverter != null) + { + this.typeConverter.ReadOnly = true; + } + + base.ReadOnlyBody = value; + } + } + + + public IPrimitiveMap Body + { + get + { + if(this.body == null) + { + this.body = new PrimitiveMap(); + this.typeConverter = new PrimitiveMapInterceptor(this, this.body); + } + + return this.typeConverter; + } + + set + { + this.body = value as PrimitiveMap; + if(value != null) + { + this.typeConverter = new PrimitiveMapInterceptor(this, value); + } + else + { + this.typeConverter = null; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/test/csharp/Commands/Message.cs ---------------------------------------------------------------------- diff --git a/src/test/csharp/Commands/Message.cs b/src/test/csharp/Commands/Message.cs new file mode 100644 index 0000000..dcb38d6 --- /dev/null +++ b/src/test/csharp/Commands/Message.cs @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections; + +using Apache.NMS.Util; + +namespace Apache.NMS.Commands +{ + public class Message : IMessage, ICloneable + { + private IDestination destination; + private string transactionId; + private string messageId; + private string groupID; + private int groupSequence; + private string correlationId; + private bool persistent; + private long expiration; + private byte priority; + private IDestination replyTo; + private long timestamp; + private string type; + private bool redelivered; + private byte[] content; + private bool readOnlyMsgProperties; + private bool readOnlyMsgBody; + + private MessagePropertyIntercepter propertyHelper; + private PrimitiveMap properties; + + /// + /// + /// Clone this object and return a new instance that the caller now owns. + /// + /// + public virtual Object Clone() + { + // Since we are the lowest level base class, do a + // shallow copy which will include the derived classes. + // From here we would do deep cloning of other objects + // if we had any. + Message o = (Message) this.MemberwiseClone(); + + if(this.messageId != null) + { + o.NMSMessageId = (string) this.messageId.Clone(); + } + + return o; + } + + /// + /// + /// Returns a string containing the information for this DataStructure + /// such as its type and value of its elements. + /// + /// + public override string ToString() + { + return GetType().Name + "[" + + "Destination=" + destination + ", " + + "TransactionId=" + transactionId + ", " + + "MessageId=" + messageId + ", " + + "GroupID=" + groupID + ", " + + "GroupSequence=" + groupSequence + ", " + + "CorrelationId=" + correlationId + ", " + + "Expiration=" + expiration + ", " + + "Priority=" + priority + ", " + + "ReplyTo=" + replyTo + ", " + + "Timestamp=" + timestamp + ", " + + "Type=" + type + ", " + + "Redelivered=" + redelivered + + "]"; + } + + public void Acknowledge() + { + } + + public virtual void ClearBody() + { + this.content = null; + } + + public virtual void ClearProperties() + { + this.properties.Clear(); + } + + protected void FailIfReadOnlyBody() + { + if(ReadOnlyBody == true) + { + throw new MessageNotWriteableException("Message is in Read-Only mode."); + } + } + + protected void FailIfWriteOnlyBody() + { + if(ReadOnlyBody == false) + { + throw new MessageNotReadableException("Message is in Write-Only mode."); + } + } + + #region Properties + + public string TransactionId + { + get { return this.transactionId; } + set { this.transactionId = value; } + } + + public byte[] Content + { + get { return content; } + set { this.content = value; } + } + + public virtual bool ReadOnlyProperties + { + get { return this.readOnlyMsgProperties; } + set { this.readOnlyMsgProperties = value; } + } + + public virtual bool ReadOnlyBody + { + get { return this.readOnlyMsgBody; } + set { this.readOnlyMsgBody = value; } + } + + public IPrimitiveMap Properties + { + get + { + if(null == properties) + { + properties = new PrimitiveMap(); + propertyHelper = new MessagePropertyIntercepter(this, properties, this.ReadOnlyProperties); + propertyHelper.AllowByteArrays = false; + } + + return propertyHelper; + } + } + + /// + /// The correlation ID used to correlate messages with conversations or long running business processes + /// + public string NMSCorrelationID + { + get { return correlationId; } + set { correlationId = value; } + } + + /// + /// The destination of the message + /// + public IDestination NMSDestination + { + get { return destination; } + set { this.destination = Destination.Transform(value); } + } + + private TimeSpan timeToLive = TimeSpan.FromMilliseconds(0); + /// + /// The time in milliseconds that this message should expire in + /// + public TimeSpan NMSTimeToLive + { + get { return timeToLive; } + + set + { + timeToLive = value; + if(timeToLive.TotalMilliseconds > 0) + { + long timeStamp = timestamp; + + if(timeStamp == 0) + { + timeStamp = DateUtils.ToJavaTimeUtc(DateTime.UtcNow); + } + + expiration = timeStamp + (long) timeToLive.TotalMilliseconds; + } + else + { + expiration = 0; + } + } + } + + /// + /// The timestamp the broker added to the message + /// + public DateTime NMSTimestamp + { + get { return DateUtils.ToDateTime(timestamp); } + set + { + timestamp = DateUtils.ToJavaTimeUtc(value); + if(timeToLive.TotalMilliseconds > 0) + { + expiration = timestamp + (long) timeToLive.TotalMilliseconds; + } + } + } + + /// + /// The message ID which is set by the provider + /// + public string NMSMessageId + { + get { return this.messageId; } + set { this.messageId = value; } + } + + /// + /// Whether or not this message is persistent + /// + public MsgDeliveryMode NMSDeliveryMode + { + get { return (persistent ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent); } + set { persistent = (MsgDeliveryMode.Persistent == value); } + } + + /// + /// The Priority on this message + /// + public MsgPriority NMSPriority + { + get { return (MsgPriority) priority; } + set { priority = (byte) value; } + } + + /// + /// Returns true if this message has been redelivered to this or another consumer before being acknowledged successfully. + /// + public bool NMSRedelivered + { + get { return this.redelivered; } + set { this.redelivered = value; } + } + + /// + /// The destination that the consumer of this message should send replies to + /// + public IDestination NMSReplyTo + { + get { return replyTo; } + set { replyTo = Destination.Transform(value); } + } + + /// + /// The type name of this message + /// + public string NMSType + { + get { return type; } + set { type = value; } + } + + #endregion + + #region NMS Extension headers + + /// + /// Returns the number of times this message has been redelivered to other consumers without being acknowledged successfully. + /// + public int NMSXDeliveryCount + { + get { return 0; } + } + + /// + /// The Message Group ID used to group messages together to the same consumer for the same group ID value + /// + public string NMSXGroupID + { + get { return groupID; } + set { groupID = value; } + } + /// + /// The Message Group Sequence counter to indicate the position in a group + /// + public int NMSXGroupSeq + { + get { return groupSequence; } + set { groupSequence = value; } + } + + /// + /// Returns the ID of the producers transaction + /// + public string NMSXProducerTXID + { + get + { + if(null != transactionId) + { + return transactionId; + } + + return String.Empty; + } + } + + #endregion + + }; +} + http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/test/csharp/Commands/ObjectMessage.cs ---------------------------------------------------------------------- diff --git a/src/test/csharp/Commands/ObjectMessage.cs b/src/test/csharp/Commands/ObjectMessage.cs new file mode 100644 index 0000000..4db9de6 --- /dev/null +++ b/src/test/csharp/Commands/ObjectMessage.cs @@ -0,0 +1,44 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System; +using System.Collections; +using System.IO; + +using Apache.NMS; + +namespace Apache.NMS.Commands +{ + public class ObjectMessage : Message, IObjectMessage + { + private object body; + + public override string ToString() { + return GetType().Name + "[" + + " ]"; + } + + // Properties + + public object Body + { + get { return body; } + set { body = value; } + } + + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/test/csharp/Commands/Queue.cs ---------------------------------------------------------------------- diff --git a/src/test/csharp/Commands/Queue.cs b/src/test/csharp/Commands/Queue.cs new file mode 100644 index 0000000..342156f --- /dev/null +++ b/src/test/csharp/Commands/Queue.cs @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; + +namespace Apache.NMS.Commands +{ + /// + /// Summary description for Queue. + /// + public class Queue : Destination, IQueue + { + public Queue() + : base() + { + } + + public Queue(String name) + : base(name) + { + } + + override public DestinationType DestinationType + { + get + { + return DestinationType.Queue; + } + } + + public String QueueName + { + get { return PhysicalName; } + } + + public override int GetDestinationType() + { + return QUEUE; + } + + public override Destination CreateDestination(String name) + { + return new Queue(name); + } + + public override Object Clone() + { + // Since we are a derived class use the base's Clone() + // to perform the shallow copy. Since it is shallow it + // will include our derived class. Since we are derived, + // this method is an override. + Queue o = (Queue) base.Clone(); + + // Now do the deep work required + // If any new variables are added then this routine will + // likely need updating + + return o; + } + } +} +