From commits-return-12628-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Thu Dec 17 18:16:56 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 27249 invoked from network); 17 Dec 2009 18:16:56 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 17 Dec 2009 18:16:56 -0000 Received: (qmail 56581 invoked by uid 500); 17 Dec 2009 18:16:56 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 56521 invoked by uid 500); 17 Dec 2009 18:16:56 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 56512 invoked by uid 99); 17 Dec 2009 18:16:56 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Dec 2009 18:16:56 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Dec 2009 18:16:53 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1F40B2388A6B; Thu, 17 Dec 2009 18:16:31 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r891816 - in /activemq/branches/activemq-5.3: activemq-core/src/main/java/org/apache/activemq/openwire/ activemq-core/src/main/java/org/apache/activemq/transport/ activemq-core/src/main/java/org/apache/activemq/transport/failover/ activemq-... Date: Thu, 17 Dec 2009 18:16:30 -0000 To: commits@activemq.apache.org From: cmacnaug@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091217181631.1F40B2388A6B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cmacnaug Date: Thu Dec 17 18:16:29 2009 New Revision: 891816 URL: http://svn.apache.org/viewvc?rev=891816&view=rev Log: Backport merge of Fix for AMQ-2511: Inactivity monitor does not time out stale connections from https://svn.apache.org/repos/asf/activemq/trunk Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/StubTransport.java activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransport.java Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java Thu Dec 17 18:16:29 2009 @@ -22,7 +22,6 @@ import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.DataStructure; @@ -63,8 +62,6 @@ private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream(); private WireFormatInfo preferedWireFormatInfo; - private AtomicBoolean receivingMessage = new AtomicBoolean(false); - public OpenWireFormat() { this(DEFAULT_VERSION); } @@ -353,7 +350,6 @@ public Object doUnmarshal(DataInput dis) throws IOException { byte dataType = dis.readByte(); - receivingMessage.set(true); if (dataType != NULL_TYPE) { DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF]; if (dsm == null) { @@ -367,10 +363,8 @@ } else { dsm.looseUnmarshal(this, data, dis); } - receivingMessage.set(false); return data; } else { - receivingMessage.set(false); return null; } } @@ -595,10 +589,6 @@ public WireFormatInfo getPreferedWireFormatInfo() { return preferedWireFormatInfo; } - - public boolean inReceive() { - return receivingMessage.get(); - } public void renegotiateWireFormat(WireFormatInfo info) throws IOException { Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Thu Dec 17 18:16:29 2009 @@ -23,6 +23,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.command.KeepAliveInfo; import org.apache.activemq.command.WireFormatInfo; @@ -56,6 +57,8 @@ private final AtomicBoolean commandReceived = new AtomicBoolean(true); private final AtomicBoolean inReceive = new AtomicBoolean(false); + private final AtomicInteger lastReceiveCounter = new AtomicInteger(0); + private SchedulerTimerTask writeCheckerTask; private SchedulerTimerTask readCheckerTask; @@ -153,7 +156,9 @@ } final void readCheck() { - if (inReceive.get() || wireFormat.inReceive()) { + int currentCounter = next.getReceiveCounter(); + int previousCounter = lastReceiveCounter.getAndSet(currentCounter); + if (inReceive.get() || currentCounter!=previousCounter ) { if (LOG.isTraceEnabled()) { LOG.trace("A receive is in progress"); } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java Thu Dec 17 18:16:29 2009 @@ -154,4 +154,12 @@ */ void reconnect(URI uri) throws IOException; + /** + * Returns a counter which gets incremented as data is read from the transport. + * It should only be used to determine if there is progress being made in reading the next command from the transport. + * The value may wrap into the negative numbers. + * + * @return a counter which gets incremented as data is read from the transport. + */ + int getReceiveCounter(); } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java Thu Dec 17 18:16:29 2009 @@ -137,4 +137,8 @@ public void reconnect(URI uri) throws IOException { next.reconnect(uri); } + + public int getReceiveCounter() { + return next.getReceiveCounter(); + } } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Dec 17 18:16:29 2009 @@ -868,4 +868,12 @@ public void reconnect(URI uri) throws IOException { add(new URI[] {uri}); } + + public int getReceiveCounter() { + Transport transport = connectedTransport.get(); + if( transport == null ) { + return 0; + } + return transport.getReceiveCounter(); + } } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Thu Dec 17 18:16:29 2009 @@ -586,4 +586,16 @@ public boolean isConnected() { return connected; } + + public int getReceiveCounter() { + int rc = 0; + synchronized (reconnectMutex) { + for (FanoutTransportHandler th : transports) { + if (th.transport != null) { + rc += th.transport.getReceiveCounter(); + } + } + } + return rc; + } } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java Thu Dec 17 18:16:29 2009 @@ -151,4 +151,8 @@ public void reconnect(URI uri) throws IOException { getNext().reconnect(uri); } + + public int getReceiveCounter() { + return getNext().getReceiveCounter(); + } } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Thu Dec 17 18:16:29 2009 @@ -202,11 +202,4 @@ this.version = version; } - public boolean inReceive() { - //TODO implement the inactivity monitor - return false; - } - - - } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java Thu Dec 17 18:16:29 2009 @@ -43,7 +43,7 @@ internalBuffer = new byte[size]; } - private void fill() throws IOException { + protected void fill() throws IOException { byte[] buffer = internalBuffer; count = 0; position = 0; Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Thu Dec 17 18:16:29 2009 @@ -118,6 +118,7 @@ private Boolean keepAlive; private Boolean tcpNoDelay; private Thread runnerThread; + private volatile int receiveCounter; /** * Connect to a remote Node - e.g. a Broker @@ -504,7 +505,28 @@ } protected void initializeStreams() throws Exception { - TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize); + TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) { + @Override + public int read() throws IOException { + receiveCounter++; + return super.read(); + } + @Override + public int read(byte[] b, int off, int len) throws IOException { + receiveCounter++; + return super.read(b, off, len); + } + @Override + public long skip(long n) throws IOException { + receiveCounter++; + return super.skip(n); + } + @Override + protected void fill() throws IOException { + receiveCounter++; + super.fill(); + } + }; this.dataIn = new DataInputStream(buffIn); buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); this.dataOut = new DataOutputStream(buffOut); @@ -551,4 +573,9 @@ } }); } + + + public int getReceiveCounter() { + return receiveCounter; + } } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java Thu Dec 17 18:16:29 2009 @@ -50,4 +50,7 @@ void setReplayAddress(SocketAddress address); void setReplayBuffer(ReplayBuffer replayBuffer); + + public int getReceiveCounter(); + } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java Thu Dec 17 18:16:29 2009 @@ -54,6 +54,7 @@ // writing private Object writeLock = new Object(); private int defaultMarshalBufferSize = 64 * 1024; + private volatile int receiveCounter; public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramChannel channel, ByteBufferPool bufferPool) { @@ -85,6 +86,8 @@ if (readBuffer.limit() == 0) { continue; } + + receiveCounter++; from = headerMarshaller.createEndpoint(readBuffer, address); int remaining = readBuffer.remaining(); @@ -252,4 +255,8 @@ } } + public int getReceiveCounter() { + return receiveCounter; + } + } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java Thu Dec 17 18:16:29 2009 @@ -48,6 +48,8 @@ private Object readLock = new Object(); private Object writeLock = new Object(); + private volatile int receiveCounter; + public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramSocket channel) { super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller); @@ -70,8 +72,9 @@ // TODO could use a DataInput implementation that talks direct // to the byte[] to avoid object allocation - DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData())); - + receiveCounter++; + DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData(), 0, datagram.getLength())); + from = headerMarshaller.createEndpoint(datagram, dataIn); answer = (Command)wireFormat.unmarshal(dataIn); break; @@ -232,4 +235,8 @@ protected ByteArrayOutputStream createByteArrayOutputStream() { return new ByteArrayOutputStream(datagramSize); } + + public int getReceiveCounter() { + return receiveCounter; + } } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Thu Dec 17 18:16:29 2009 @@ -462,4 +462,11 @@ } return null; } + + public int getReceiveCounter() { + if (commandChannel == null) { + return 0; + } + return commandChannel.getReceiveCounter(); + } } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Thu Dec 17 18:16:29 2009 @@ -62,6 +62,7 @@ private final Object lazyInitMutext = new Object(); private final Valve enqueueValve = new Valve(true); private final AtomicBoolean stopping = new AtomicBoolean(); + private volatile int receiveCounter; public VMTransport(URI location) { this.location = location; @@ -112,6 +113,7 @@ if( command == DISCONNECT ) { transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); } else { + peer.receiveCounter++; transportListener.onCommand(command); } } @@ -126,6 +128,7 @@ if (messageQueue != null && !async) { Object command; while ((command = messageQueue.poll()) != null && !stopping.get() ) { + receiveCounter++; transportListener.onCommand(command); } } @@ -343,4 +346,8 @@ public void reconnect(URI uri) throws IOException { throw new IOException("Not supported"); } + + public int getReceiveCounter() { + return receiveCounter; + } } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/ObjectStreamWireFormat.java Thu Dec 17 18:16:29 2009 @@ -75,11 +75,4 @@ return 0; } - public boolean inReceive() { - // TODO implement the inactivity monitor - return false; - } - - - } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/wireformat/WireFormat.java Thu Dec 17 18:16:29 2009 @@ -61,9 +61,4 @@ */ int getVersion(); - /** - * @return true if message is being received - */ - boolean inReceive(); - } Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/StubTransport.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/StubTransport.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/StubTransport.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/StubTransport.java Thu Dec 17 18:16:29 2009 @@ -29,6 +29,7 @@ public class StubTransport extends TransportSupport { private Queue queue = new ConcurrentLinkedQueue(); + private volatile int receiveCounter; protected void doStop(ServiceStopper stopper) throws Exception { } @@ -37,6 +38,7 @@ } public void oneway(Object command) throws IOException { + receiveCounter++; queue.add(command); } @@ -48,4 +50,8 @@ return null; } + public int getReceiveCounter() { + return receiveCounter; + } + } Modified: activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java (original) +++ activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java Thu Dec 17 18:16:29 2009 @@ -64,5 +64,9 @@ } protected void doStop(ServiceStopper stopper) throws Exception { + } + + public int getReceiveCounter() { + return 0; } } Modified: activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java (original) +++ activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java Thu Dec 17 18:16:29 2009 @@ -58,6 +58,7 @@ private final String clientID = CLIENT_ID_GENERATOR.generateId(); private boolean trace; private GetMethod httpMethod; + private volatile int receiveCounter; public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) { super(wireFormat, remoteUrl); @@ -135,6 +136,7 @@ break; } } else { + receiveCounter++; DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream()); Object command = (Object)getTextWireFormat().unmarshal(stream); if (command == null) { @@ -221,4 +223,8 @@ this.trace = trace; } + public int getReceiveCounter() { + return receiveCounter; + } + } Modified: activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransport.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransport.java?rev=891816&r1=891815&r2=891816&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransport.java (original) +++ activemq/branches/activemq-5.3/activemq-optional/src/main/java/org/apache/activemq/transport/http/HttpTransport.java Thu Dec 17 18:16:29 2009 @@ -47,7 +47,8 @@ private HttpURLConnection receiveConnection; private URL url; private String clientID; - + private volatile int receiveCounter; + // private String sessionID; public HttpTransport(TextWireFormat wireFormat, URI remoteUrl) throws MalformedURLException { @@ -102,6 +103,7 @@ // checkSession(connection); // Create a String for the UTF content + receiveCounter++; InputStream is = connection.getInputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream(connection.getContentLength() > 0 ? connection.getContentLength() : 1024); int c = 0; @@ -228,4 +230,8 @@ } } + public int getReceiveCounter() { + return receiveCounter; + } + }