Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 87421 invoked from network); 9 Oct 2006 20:18:51 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 9 Oct 2006 20:18:51 -0000 Received: (qmail 54023 invoked by uid 500); 9 Oct 2006 20:18:51 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 54004 invoked by uid 500); 9 Oct 2006 20:18:51 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 53995 invoked by uid 99); 9 Oct 2006 20:18:51 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Oct 2006 13:18:51 -0700 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Oct 2006 13:18:47 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 2BCC61A981A; Mon, 9 Oct 2006 13:18:27 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r454489 [1/2] - in /incubator/activemq/activemq-dotnet/trunk: ./ src/main/csharp/ActiveMQ/ src/main/csharp/ActiveMQ/Commands/ src/main/csharp/ActiveMQ/OpenWire/ src/main/csharp/ActiveMQ/OpenWire/V1/ src/main/csharp/ActiveMQ/OpenWire/V2/ src... Date: Mon, 09 Oct 2006 20:18:26 -0000 To: activemq-commits@geronimo.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061009201827.2BCC61A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: chirino Date: Mon Oct 9 13:18:25 2006 New Revision: 454489 URL: http://svn.apache.org/viewvc?view=rev&rev=454489 Log: Added openwire negociation support so that we can switch to version 2 if the remote broker supports version 2 of the openwire protocol. Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/IMarshallerFactory.cs incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs incubator/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/WireFormatInfo.cs incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V2/MarshallerFactory.cs incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSTestSupport.cs incubator/activemq/activemq-dotnet/trunk/vs2005-activemq.csproj incubator/activemq/activemq-dotnet/trunk/vs2005-msmq.csproj incubator/activemq/activemq-dotnet/trunk/vs2005-nms.csproj Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/WireFormatInfo.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/WireFormatInfo.cs?view=diff&rev=454489&r1=454488&r2=454489 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/WireFormatInfo.cs (original) +++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/WireFormatInfo.cs Mon Oct 9 13:18:25 2006 @@ -15,6 +15,7 @@ * limitations under the License. */ +using System; using ActiveMQ.OpenWire; using NMS; @@ -65,6 +66,23 @@ set { this.magic = value; } } + public bool Valid + { + get + { + if ( magic == null ) + return false; + if (magic.Length != MAGIC.Length) + return false; + for (int i = 0; i < magic.Length; i++ ) + { + if( magic[i]!=MAGIC[i] ) + return false; + } + return true; + } + } + public int Version { get { return version; } Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs?view=diff&rev=454489&r1=454488&r2=454489 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs (original) +++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Connection.cs Mon Oct 9 13:18:25 2006 @@ -39,8 +39,9 @@ private long sessionCounter; private long temporaryDestinationCounter; private long localTransactionCounter; - - + private bool closing; + + public Connection(ITransport transport, ConnectionInfo info) { this.transport = transport; @@ -93,6 +94,7 @@ session.Dispose(); } */ + closing = true; DisposeOf(ConnectionId); sessions.Clear(); transport.Oneway(new ShutdownInfo()); @@ -269,6 +271,14 @@ else if (command is BrokerInfo) { this.brokerInfo = (BrokerInfo) command; + } + else if (command is ShutdownInfo) + { + ShutdownInfo info = (ShutdownInfo)command; + if( !closing && !closed ) + { + OnException(transport, new NMSException("Broker closed this connection.")); + } } else { Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs?view=diff&rev=454489&r1=454488&r2=454489 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs (original) +++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/MessageConsumer.cs Mon Oct 9 13:18:25 2006 @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +using System; using ActiveMQ.Commands; using NMS; using System.Threading; @@ -144,7 +145,13 @@ { //here we add the code that if do acknowledge action. message = AutoAcknowledge(message); - listener(message); + try + { + listener(message); + } catch(Exception e) + { + // TODO: what do do if the listener errors out? + } } // lets now break to give the acknowledgement a chance to be processed Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/IMarshallerFactory.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/IMarshallerFactory.cs?view=auto&rev=454489 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/IMarshallerFactory.cs (added) +++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/IMarshallerFactory.cs Mon Oct 9 13:18:25 2006 @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; +using ActiveMQ.OpenWire; + +namespace ActiveMQ.OpenWire +{ + interface IMarshallerFactory + { + void configure(OpenWireFormat format); + } +} Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs?view=diff&rev=454489&r1=454488&r2=454489 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs (original) +++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs Mon Oct 9 13:18:25 2006 @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +using System.Reflection; using ActiveMQ.Commands; using ActiveMQ.OpenWire.V1; using System; @@ -30,18 +31,25 @@ private BaseDataStreamMarshaller[] dataMarshallers; private const byte NULL_TYPE = 0; - private int version=1; + private int version; private bool stackTraceEnabled=false; private bool tightEncodingEnabled=false; private bool sizePrefixDisabled=false; - + private int minimumVersion=1; + + private WireFormatInfo preferedWireFormatInfo = new WireFormatInfo(); + public OpenWireFormat() { + PreferedWireFormatInfo.StackTraceEnabled = false; + PreferedWireFormatInfo.TightEncodingEnabled = false; + PreferedWireFormatInfo.TcpNoDelayEnabled = false; + PreferedWireFormatInfo.CacheEnabled = false; + PreferedWireFormatInfo.SizePrefixDisabled = false; + PreferedWireFormatInfo.Version = 2; + dataMarshallers = new BaseDataStreamMarshaller[256]; - // TODO: We need to dynamically load the marshaller factory based - // on the openwire version. - MarshallerFactory factory = new MarshallerFactory(); - factory.configure(this); + Version = 1; } public bool StackTraceEnabled { @@ -50,7 +58,14 @@ } public int Version { get { return version; } - set { version = value; } + set { + + Assembly dll = Assembly.GetExecutingAssembly(); + Type type = dll.GetType("ActiveMQ.OpenWire.V"+value+".MarshallerFactory", false); + IMarshallerFactory factory = (IMarshallerFactory) Activator.CreateInstance(type); + factory.configure(this); + version = value; + } } public bool SizePrefixDisabled { get { return sizePrefixDisabled; } @@ -60,6 +75,20 @@ get { return tightEncodingEnabled; } set { tightEncodingEnabled = value; } } + + public WireFormatInfo PreferedWireFormatInfo + { + get { return preferedWireFormatInfo; } + set { preferedWireFormatInfo = value; } + } + + public void clearMarshallers() + { + for (int i=0; i < dataMarshallers.Length; i++ ) + { + dataMarshallers[i] = null; + } + } public void addMarshaller(BaseDataStreamMarshaller marshaller) { @@ -276,6 +305,21 @@ return null; } } - + + public void renegotiateWireFormat(WireFormatInfo info) + { + if (info.Version < minimumVersion) + { + throw new IOException("Remote wire format (" + info.Version +") is lower the minimum version required (" + minimumVersion + ")"); + } + + this.Version = Math.Min( PreferedWireFormatInfo.Version, info.Version); + this.stackTraceEnabled = info.StackTraceEnabled && PreferedWireFormatInfo.StackTraceEnabled; +// this.tcpNoDelayEnabled = info.TcpNoDelayEnabled && PreferedWireFormatInfo.TcpNoDelayEnabled; +// this.cacheEnabled = info.CacheEnabled && PreferedWireFormatInfo.CacheEnabled; + this.tightEncodingEnabled = info.TightEncodingEnabled && PreferedWireFormatInfo.TightEncodingEnabled; + this.sizePrefixDisabled = info.SizePrefixDisabled && PreferedWireFormatInfo.SizePrefixDisabled; + + } } } Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs?view=diff&rev=454489&r1=454488&r2=454489 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs (original) +++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V1/MarshallerFactory.cs Mon Oct 9 13:18:25 2006 @@ -34,11 +34,11 @@ /// /// Used to create marshallers for a specific version of the wire protocol /// - public class MarshallerFactory + public class MarshallerFactory : IMarshallerFactory { public void configure(OpenWireFormat format) { - + format.clearMarshallers(); format.addMarshaller(new LocalTransactionIdMarshaller()); format.addMarshaller(new PartialCommandMarshaller()); format.addMarshaller(new IntegerResponseMarshaller()); Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V2/MarshallerFactory.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V2/MarshallerFactory.cs?view=diff&rev=454489&r1=454488&r2=454489 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V2/MarshallerFactory.cs (original) +++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/V2/MarshallerFactory.cs Mon Oct 9 13:18:25 2006 @@ -1,98 +1,99 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// -// NOTE!: This file is autogenerated - do not modify! -// if you need to make a change, please see the Groovy scripts in the -// activemq-core module -// - -using System; -using System.Collections; -using System.IO; - -using ActiveMQ.Commands; -using ActiveMQ.OpenWire; -using ActiveMQ.OpenWire.V2; - -namespace ActiveMQ.OpenWire.V2 -{ - /// - /// Used to create marshallers for a specific version of the wire protocol - /// - public class MarshallerFactory - { - public void configure(OpenWireFormat format) - { - format.addMarshaller(new ActiveMQBytesMessageMarshaller()); - format.addMarshaller(new ActiveMQMapMessageMarshaller()); - format.addMarshaller(new ActiveMQMessageMarshaller()); - format.addMarshaller(new ActiveMQObjectMessageMarshaller()); - format.addMarshaller(new ActiveMQQueueMarshaller()); - format.addMarshaller(new ActiveMQStreamMessageMarshaller()); - format.addMarshaller(new ActiveMQTempQueueMarshaller()); - format.addMarshaller(new ActiveMQTempTopicMarshaller()); - format.addMarshaller(new ActiveMQTextMessageMarshaller()); - format.addMarshaller(new ActiveMQTopicMarshaller()); - format.addMarshaller(new BrokerIdMarshaller()); - format.addMarshaller(new BrokerInfoMarshaller()); - format.addMarshaller(new ConnectionControlMarshaller()); - format.addMarshaller(new ConnectionErrorMarshaller()); - format.addMarshaller(new ConnectionIdMarshaller()); - format.addMarshaller(new ConnectionInfoMarshaller()); - format.addMarshaller(new ConsumerControlMarshaller()); - format.addMarshaller(new ConsumerIdMarshaller()); - format.addMarshaller(new ConsumerInfoMarshaller()); - format.addMarshaller(new ControlCommandMarshaller()); - format.addMarshaller(new DataArrayResponseMarshaller()); - format.addMarshaller(new DataResponseMarshaller()); - format.addMarshaller(new DestinationInfoMarshaller()); - format.addMarshaller(new DiscoveryEventMarshaller()); - format.addMarshaller(new ExceptionResponseMarshaller()); - format.addMarshaller(new FlushCommandMarshaller()); - format.addMarshaller(new IntegerResponseMarshaller()); - format.addMarshaller(new JournalQueueAckMarshaller()); - format.addMarshaller(new JournalTopicAckMarshaller()); - format.addMarshaller(new JournalTraceMarshaller()); - format.addMarshaller(new JournalTransactionMarshaller()); - format.addMarshaller(new KeepAliveInfoMarshaller()); - format.addMarshaller(new LastPartialCommandMarshaller()); - format.addMarshaller(new LocalTransactionIdMarshaller()); - format.addMarshaller(new MessageAckMarshaller()); - format.addMarshaller(new MessageDispatchMarshaller()); - format.addMarshaller(new MessageDispatchNotificationMarshaller()); - format.addMarshaller(new MessageIdMarshaller()); - format.addMarshaller(new MessagePullMarshaller()); - format.addMarshaller(new NetworkBridgeFilterMarshaller()); - format.addMarshaller(new PartialCommandMarshaller()); - format.addMarshaller(new ProducerIdMarshaller()); - format.addMarshaller(new ProducerInfoMarshaller()); - format.addMarshaller(new RemoveInfoMarshaller()); - format.addMarshaller(new RemoveSubscriptionInfoMarshaller()); - format.addMarshaller(new ReplayCommandMarshaller()); - format.addMarshaller(new ResponseMarshaller()); - format.addMarshaller(new SessionIdMarshaller()); - format.addMarshaller(new SessionInfoMarshaller()); - format.addMarshaller(new ShutdownInfoMarshaller()); - format.addMarshaller(new SubscriptionInfoMarshaller()); - format.addMarshaller(new TransactionInfoMarshaller()); - format.addMarshaller(new WireFormatInfoMarshaller()); - format.addMarshaller(new XATransactionIdMarshaller()); - - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// +// NOTE!: This file is autogenerated - do not modify! +// if you need to make a change, please see the Groovy scripts in the +// activemq-core module +// + +using System; +using System.Collections; +using System.IO; + +using ActiveMQ.Commands; +using ActiveMQ.OpenWire; +using ActiveMQ.OpenWire.V2; + +namespace ActiveMQ.OpenWire.V2 +{ + /// + /// Used to create marshallers for a specific version of the wire protocol + /// + public class MarshallerFactory : IMarshallerFactory + { + public void configure(OpenWireFormat format) + { + format.clearMarshallers(); + format.addMarshaller(new ActiveMQBytesMessageMarshaller()); + format.addMarshaller(new ActiveMQMapMessageMarshaller()); + format.addMarshaller(new ActiveMQMessageMarshaller()); + format.addMarshaller(new ActiveMQObjectMessageMarshaller()); + format.addMarshaller(new ActiveMQQueueMarshaller()); + format.addMarshaller(new ActiveMQStreamMessageMarshaller()); + format.addMarshaller(new ActiveMQTempQueueMarshaller()); + format.addMarshaller(new ActiveMQTempTopicMarshaller()); + format.addMarshaller(new ActiveMQTextMessageMarshaller()); + format.addMarshaller(new ActiveMQTopicMarshaller()); + format.addMarshaller(new BrokerIdMarshaller()); + format.addMarshaller(new BrokerInfoMarshaller()); + format.addMarshaller(new ConnectionControlMarshaller()); + format.addMarshaller(new ConnectionErrorMarshaller()); + format.addMarshaller(new ConnectionIdMarshaller()); + format.addMarshaller(new ConnectionInfoMarshaller()); + format.addMarshaller(new ConsumerControlMarshaller()); + format.addMarshaller(new ConsumerIdMarshaller()); + format.addMarshaller(new ConsumerInfoMarshaller()); + format.addMarshaller(new ControlCommandMarshaller()); + format.addMarshaller(new DataArrayResponseMarshaller()); + format.addMarshaller(new DataResponseMarshaller()); + format.addMarshaller(new DestinationInfoMarshaller()); + format.addMarshaller(new DiscoveryEventMarshaller()); + format.addMarshaller(new ExceptionResponseMarshaller()); + format.addMarshaller(new FlushCommandMarshaller()); + format.addMarshaller(new IntegerResponseMarshaller()); + format.addMarshaller(new JournalQueueAckMarshaller()); + format.addMarshaller(new JournalTopicAckMarshaller()); + format.addMarshaller(new JournalTraceMarshaller()); + format.addMarshaller(new JournalTransactionMarshaller()); + format.addMarshaller(new KeepAliveInfoMarshaller()); + format.addMarshaller(new LastPartialCommandMarshaller()); + format.addMarshaller(new LocalTransactionIdMarshaller()); + format.addMarshaller(new MessageAckMarshaller()); + format.addMarshaller(new MessageDispatchMarshaller()); + format.addMarshaller(new MessageDispatchNotificationMarshaller()); + format.addMarshaller(new MessageIdMarshaller()); + format.addMarshaller(new MessagePullMarshaller()); + format.addMarshaller(new NetworkBridgeFilterMarshaller()); + format.addMarshaller(new PartialCommandMarshaller()); + format.addMarshaller(new ProducerIdMarshaller()); + format.addMarshaller(new ProducerInfoMarshaller()); + format.addMarshaller(new RemoveInfoMarshaller()); + format.addMarshaller(new RemoveSubscriptionInfoMarshaller()); + format.addMarshaller(new ReplayCommandMarshaller()); + format.addMarshaller(new ResponseMarshaller()); + format.addMarshaller(new SessionIdMarshaller()); + format.addMarshaller(new SessionInfoMarshaller()); + format.addMarshaller(new ShutdownInfoMarshaller()); + format.addMarshaller(new SubscriptionInfoMarshaller()); + format.addMarshaller(new TransactionInfoMarshaller()); + format.addMarshaller(new WireFormatInfoMarshaller()); + format.addMarshaller(new XATransactionIdMarshaller()); + + } + } +} Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs?view=diff&rev=454489&r1=454488&r2=454489 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs (original) +++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs Mon Oct 9 13:18:25 2006 @@ -75,7 +75,7 @@ public void Oneway(Command command) { - wireformat.Marshal(command, socketWriter); + Wireformat.Marshal(command, socketWriter); socketWriter.Flush(); } @@ -104,7 +104,7 @@ { try { - Command command = (Command) wireformat.Unmarshal(socketReader); + Command command = (Command) Wireformat.Unmarshal(socketReader); this.commandHandler(this, command); } catch (ObjectDisposedException) @@ -135,6 +135,11 @@ set { this.exceptionHandler = value; } } + public OpenWireFormat Wireformat + { + get { return wireformat; } + set { wireformat = value; } + } } } Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs?view=diff&rev=454489&r1=454488&r2=454489 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs (original) +++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs Mon Oct 9 13:18:25 2006 @@ -36,15 +36,17 @@ { // Console.WriteLine("Opening socket to: " + host + " on port: " + port); Socket socket = Connect(location.Host, location.Port); - ITransport rc = new TcpTransport(socket); + TcpTransport tcpTransport = new TcpTransport(socket); + ITransport rc = tcpTransport; - // At present the URI is parsed for options by the ConnectionFactory + rc = new WireFormatNegotiator(rc, tcpTransport.Wireformat); + + // At present the URI is parsed for options by the ConnectionFactory if (UseLogging) { rc = new LoggingTransport(rc); } rc = new ResponseCorrelator(rc); rc = new MutexTransport(rc); - rc = new WireFormatNegotiator(rc); return rc; } Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs?view=diff&rev=454489&r1=454488&r2=454489 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs (original) +++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs Mon Oct 9 13:18:25 2006 @@ -1,50 +1,106 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -using ActiveMQ.Commands; -using ActiveMQ.Transport; -using System; - -namespace ActiveMQ.Transport -{ - - /// - /// A Transport which negotiates the wire format - /// - public class WireFormatNegotiator : TransportFilter - { - - public WireFormatNegotiator(ITransport next) : base(next) { - } - - public override void Start() { - base.Start(); - - - // now lets start the protocol negotiation - WireFormatInfo info = new WireFormatInfo(); - info.StackTraceEnabled=false; - info.TightEncodingEnabled=false; - info.TcpNoDelayEnabled=false; - info.CacheEnabled=false; - info.SizePrefixDisabled=false; - info.Version = 1; - - Oneway(info); - } - } -} - +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System.IO; +using System.Threading; +using ActiveMQ.Commands; +using ActiveMQ.OpenWire; +using ActiveMQ.Transport; +using System; +using ActiveMQ.Util; + +namespace ActiveMQ.Transport +{ + + /// + /// A Transport which negotiates the wire format + /// + public class WireFormatNegotiator : TransportFilter + { + private OpenWireFormat wireFormat; + private TimeSpan negotiateTimeout=new TimeSpan(0,0,15); + + private AtomicBoolean firstStart=new AtomicBoolean(true); + private CountDownLatch readyCountDownLatch = new CountDownLatch(1); + private CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1); + + public WireFormatNegotiator(ITransport next, OpenWireFormat wireFormat) + : base(next) + { + this.wireFormat = wireFormat; + } + + public override void Start() { + base.Start(); + if (firstStart.compareAndSet(true, false)) + { + try + { + next.Oneway(wireFormat.PreferedWireFormatInfo); + } + finally + { + wireInfoSentDownLatch.countDown(); + } + } + } + + public override void Dispose() { + base.Dispose(); + readyCountDownLatch.countDown(); + } + + public override void Oneway(Command command) + { + if (!readyCountDownLatch.await(negotiateTimeout)) + throw new IOException("Wire format negociation timeout: peer did not send his wire format."); + next.Oneway(command); + } + + protected override void OnCommand(ITransport sender, Command command) + { + if ( command.GetDataStructureType() == WireFormatInfo.ID_WireFormatInfo ) + { + WireFormatInfo info = (WireFormatInfo)command; + try + { + if (!info.Valid) + { + throw new IOException("Remote wire format magic is invalid"); + } + wireInfoSentDownLatch.await(negotiateTimeout); + wireFormat.renegotiateWireFormat(info); + } + catch (Exception e) + { + OnException(this, e); + } + finally + { + readyCountDownLatch.countDown(); + } + } + this.commandHandler(sender, command); + } + + protected override void OnException(ITransport sender, Exception command) + { + readyCountDownLatch.countDown(); + this.exceptionHandler(sender, command); + } + } +} + Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs?view=auto&rev=454489 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs (added) +++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs Mon Oct 9 13:18:25 2006 @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; +using System.Collections.Generic; +using System.Text; + +namespace ActiveMQ.Util +{ + class AtomicBoolean + { + bool value; + + public AtomicBoolean(bool b) + { + value = b; + } + + public bool compareAndSet(bool expected, bool newValue) + { + lock(this) + { + if (value == expected) + { + value = newValue; + return true; + } + return false; + } + } + } +} Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs?view=auto&rev=454489 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs (added) +++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs Mon Oct 9 13:18:25 2006 @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace ActiveMQ.Util +{ + class CountDownLatch + { + int remaining; + public CountDownLatch(int i) + { + remaining=i; + } + + public void countDown() + { + lock(this) + { + if( remaining > 0 ) { + remaining--; + Monitor.PulseAll(this); + } + } + } + + public bool await(TimeSpan timeout) + { + lock (this) + { + if (remaining > 0) + { + Monitor.Wait(this, timeout); + if (remaining > 0) + { + return false; + } + } + } + return true; + } + } +} Added: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs?view=auto&rev=454489 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs (added) +++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs Mon Oct 9 13:18:25 2006 @@ -0,0 +1,27 @@ +using System; +using System.Reflection; +using System.Runtime.InteropServices; + +//------------------------------------------------------------------------------ +// +// This code was generated by a tool. +// Runtime Version:2.0.50727.42 +// +// Changes to this file may cause incorrect behavior and will be lost if +// the code is regenerated. +// +//------------------------------------------------------------------------------ + +[assembly: ComVisibleAttribute(false)] +[assembly: CLSCompliantAttribute(true)] +[assembly: AssemblyTitleAttribute("Apache NMS for MSMQ")] +[assembly: AssemblyDescriptionAttribute("An NMS (.Net Messaging Library) to MSMQ")] +[assembly: AssemblyConfigurationAttribute("SNAPSHOT")] +[assembly: AssemblyCompanyAttribute("http://incubator.apache.org/activemq/")] +[assembly: AssemblyProductAttribute("Apache ActiveMQ")] +[assembly: AssemblyCopyrightAttribute("Copyright (C) 2005-2006 Apache Software Foundation")] +[assembly: AssemblyTrademarkAttribute("")] +[assembly: AssemblyCultureAttribute("")] +[assembly: AssemblyVersionAttribute("4.0")] +[assembly: AssemblyInformationalVersionAttribute("4.0")] + Modified: incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSTestSupport.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSTestSupport.cs?view=diff&rev=454489&r1=454488&r2=454489 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSTestSupport.cs (original) +++ incubator/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSTestSupport.cs Mon Oct 9 13:18:25 2006 @@ -196,7 +196,7 @@ protected virtual string CreateDestinationName() { - return "Test.DotNet." + GetType().Name; + return "Test.DotNet." + GetType().Name + "." + DateTime.Now.Ticks; } protected virtual IMessage CreateMessage()