From commits-return-10965-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Tue Jun 02 21:30:32 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 13373 invoked from network); 2 Jun 2009 21:30:32 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 2 Jun 2009 21:30:32 -0000 Received: (qmail 96781 invoked by uid 500); 2 Jun 2009 21:30:44 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 96747 invoked by uid 500); 2 Jun 2009 21:30:44 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 96725 invoked by uid 99); 2 Jun 2009 21:30:44 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Jun 2009 21:30:44 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Jun 2009 21:30:35 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 098702388981; Tue, 2 Jun 2009 21:29:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r781177 [11/11] - in /activemq/sandbox/activemq-flow: activemq-bio/ activemq-bio/src/main/java/org/ activemq-bio/src/main/java/org/apache/ activemq-bio/src/main/java/org/apache/activemq/ activemq-bio/src/main/java/org/apache/activemq/transp... Date: Tue, 02 Jun 2009 21:29:35 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090602212941.098702388981@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Synchronization.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Synchronization.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Synchronization.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Synchronization.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transaction; + +/** + * @version $Revision$ + */ +public class Synchronization { + + public void beforeEnd() throws Exception { + } + + public void afterCommit() throws Exception { + } + + public void afterRollback() throws Exception { + } + +} Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Synchronization.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Transaction.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Transaction.java (added) +++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Transaction.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transaction; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; + +import javax.transaction.xa.XAException; + +import org.apache.activemq.command.TransactionId; + +/** + * Keeps track of all the actions the need to be done when a transaction does a + * commit or rollback. + * + * @version $Revision: 1.5 $ + */ +public abstract class Transaction { + + public static final byte START_STATE = 0; // can go to: 1,2,3 + public static final byte IN_USE_STATE = 1; // can go to: 2,3 + public static final byte PREPARED_STATE = 2; // can go to: 3 + public static final byte FINISHED_STATE = 3; + + private ArrayList synchronizations = new ArrayList(); + private byte state = START_STATE; + + public byte getState() { + return state; + } + + public void setState(byte state) { + this.state = state; + } + + public void addSynchronization(Synchronization r) { + synchronizations.add(r); + if (state == START_STATE) { + state = IN_USE_STATE; + } + } + + public void removeSynchronization(Synchronization r) { + synchronizations.remove(r); + } + + public void prePrepare() throws Exception { + + // Is it ok to call prepare now given the state of the + // transaction? + switch (state) { + case START_STATE: + case IN_USE_STATE: + break; + default: + XAException xae = new XAException("Prepare cannot be called now."); + xae.errorCode = XAException.XAER_PROTO; + throw xae; + } + + // // Run the prePrepareTasks + // for (Iterator iter = prePrepareTasks.iterator(); iter.hasNext();) { + // Callback r = (Callback) iter.next(); + // r.execute(); + // } + } + + protected void fireAfterCommit() throws Exception { + for (Iterator iter = synchronizations.iterator(); iter.hasNext();) { + Synchronization s = iter.next(); + s.afterCommit(); + } + } + + public void fireAfterRollback() throws Exception { + Collections.reverse(synchronizations); + for (Iterator iter = synchronizations.iterator(); iter.hasNext();) { + Synchronization s = iter.next(); + s.afterRollback(); + } + } + + public String toString() { + return super.toString() + "[synchronizations=" + synchronizations + "]"; + } + + public abstract void commit(boolean onePhase) throws XAException, IOException; + + public abstract void rollback() throws XAException, IOException; + + public abstract int prepare() throws XAException, IOException; + + public abstract TransactionId getTransactionId(); + + public boolean isPrepared() { + return getState() == PREPARED_STATE; + } + + public int size() { + return synchronizations.size(); + } +} Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/transaction/Transaction.java ------------------------------------------------------------------------------ svn:executable = * Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/IConnection.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/IConnection.java?rev=781177&r1=781176&r2=781177&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/IConnection.java (original) +++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/IConnection.java Tue Jun 2 21:29:30 2009 @@ -16,6 +16,8 @@ */ package org.apache.activemq; +import javax.jms.JMSException; + import org.apache.activemq.command.ActiveMQTempDestination; @@ -27,5 +29,5 @@ boolean isObjectMessageSerializationDefered(); - void deleteTempDestination(ActiveMQTempDestination activeMQTempDestination); + void deleteTempDestination(ActiveMQTempDestination activeMQTempDestination) throws JMSException; } Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/InactivityMonitor.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (added) +++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport; + +import java.io.IOException; +import java.util.Timer; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.command.KeepAliveInfo; +import org.apache.activemq.command.WireFormatInfo; +import org.apache.activemq.thread.SchedulerTimerTask; +import org.apache.activemq.wireformat.WireFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Used to make sure that commands are arriving periodically from the peer of + * the transport. + * + * @version $Revision$ + */ +public class InactivityMonitor extends TransportFilter { + + private static final Log LOG = LogFactory.getLog(InactivityMonitor.class); + private static final ThreadPoolExecutor ASYNC_TASKS; + + private static int CHECKER_COUNTER; + private static Timer READ_CHECK_TIMER; + private static Timer WRITE_CHECK_TIMER; + + private WireFormatInfo localWireFormatInfo; + private WireFormatInfo remoteWireFormatInfo; + private final AtomicBoolean monitorStarted = new AtomicBoolean(false); + + private final AtomicBoolean commandSent = new AtomicBoolean(false); + private final AtomicBoolean inSend = new AtomicBoolean(false); + private final AtomicBoolean failed = new AtomicBoolean(false); + + private final AtomicBoolean commandReceived = new AtomicBoolean(true); + private final AtomicBoolean inReceive = new AtomicBoolean(false); + private SchedulerTimerTask writeCheckerTask; + private SchedulerTimerTask readCheckerTask; + + private long readCheckTime; + private long writeCheckTime; + private long initialDelayTime; + + private WireFormat wireFormat; + + private final Runnable readChecker = new Runnable() { + long lastRunTime; + public void run() { + long now = System.currentTimeMillis(); + long elapsed = (now-lastRunTime); + + if( lastRunTime != 0 && LOG.isDebugEnabled() ) { + LOG.debug(""+elapsed+" ms elapsed since last read check."); + } + + // Perhaps the timer executed a read check late.. and then executes + // the next read check on time which causes the time elapsed between + // read checks to be small.. + + // If less than 90% of the read check Time elapsed then abort this readcheck. + if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression. + LOG.debug("Aborting read check.. Not enough time elapsed since last read check."); + return; + } + + lastRunTime = now; + readCheck(); + } + }; + + private boolean allowReadCheck(long elapsed) { + return elapsed > (readCheckTime * 9 / 10); + } + + private final Runnable writeChecker = new Runnable() { + long lastRunTime; + public void run() { + long now = System.currentTimeMillis(); + if( lastRunTime != 0 && LOG.isDebugEnabled() ) { + LOG.debug(""+(now-lastRunTime)+" ms elapsed since last write check."); + + } + lastRunTime = now; + writeCheck(); + } + }; + + public InactivityMonitor(Transport next, WireFormat wireFormat) { + super(next); + this.wireFormat = wireFormat; + } + + public void stop() throws Exception { + stopMonitorThreads(); + next.stop(); + } + + final void writeCheck() { + if (inSend.get()) { + if (LOG.isTraceEnabled()) { + LOG.trace("A send is in progress"); + } + return; + } + + if (!commandSent.get()) { + if(LOG.isTraceEnabled()) { + LOG.trace("No message sent since last write check, sending a KeepAliveInfo"); + } + ASYNC_TASKS.execute(new Runnable() { + public void run() { + if (monitorStarted.get()) { + try { + + KeepAliveInfo info = new KeepAliveInfo(); + info.setResponseRequired(true); + oneway(info); + } catch (IOException e) { + onException(e); + } + } + }; + }); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Message sent since last write check, resetting flag"); + } + } + + commandSent.set(false); + } + + final void readCheck() { + if (inReceive.get() || wireFormat.inReceive()) { + if (LOG.isTraceEnabled()) { + LOG.trace("A receive is in progress"); + } + return; + } + if (!commandReceived.get()) { + if (LOG.isDebugEnabled()) { + LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); + } + ASYNC_TASKS.execute(new Runnable() { + public void run() { + onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress())); + }; + + }); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Message received since last read check, resetting flag: "); + } + } + commandReceived.set(false); + } + + public void onCommand(Object command) { + commandReceived.set(true); + inReceive.set(true); + try { + if (command.getClass() == KeepAliveInfo.class) { + KeepAliveInfo info = (KeepAliveInfo) command; + if (info.isResponseRequired()) { + try { + info.setResponseRequired(false); + oneway(info); + } catch (IOException e) { + onException(e); + } + } + } else { + if (command.getClass() == WireFormatInfo.class) { + synchronized (this) { + IOException error = null; + remoteWireFormatInfo = (WireFormatInfo) command; + try { + startMonitorThreads(); + } catch (IOException e) { + error = e; + } + if (error != null) { + onException(error); + } + } + } + synchronized (readChecker) { + transportListener.onCommand(command); + } + } + } finally { + + inReceive.set(false); + } + } + + public void oneway(Object o) throws IOException { + // Disable inactivity monitoring while processing a command. + //synchronize this method - its not synchronized + //further down the transport stack and gets called by more + //than one thread by this class + synchronized(inSend) { + inSend.set(true); + try { + + if( failed.get() ) { + throw new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()); + } + if (o.getClass() == WireFormatInfo.class) { + synchronized (this) { + localWireFormatInfo = (WireFormatInfo)o; + startMonitorThreads(); + } + } + next.oneway(o); + } finally { + commandSent.set(true); + inSend.set(false); + } + } + } + + public void onException(IOException error) { + if (failed.compareAndSet(false, true)) { + stopMonitorThreads(); + transportListener.onException(error); + } + } + + private synchronized void startMonitorThreads() throws IOException { + if (monitorStarted.get()) { + return; + } + if (localWireFormatInfo == null) { + return; + } + if (remoteWireFormatInfo == null) { + return; + } + + readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration()); + initialDelayTime = Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay()); + if (readCheckTime > 0) { + monitorStarted.set(true); + writeCheckerTask = new SchedulerTimerTask(writeChecker); + readCheckerTask = new SchedulerTimerTask(readChecker); + writeCheckTime = readCheckTime/3; + synchronized( InactivityMonitor.class ) { + if( CHECKER_COUNTER == 0 ) { + READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck",true); + WRITE_CHECK_TIMER = new Timer("InactivityMonitor WriteCheck",true); + } + CHECKER_COUNTER++; + WRITE_CHECK_TIMER.scheduleAtFixedRate(writeCheckerTask, initialDelayTime,writeCheckTime); + READ_CHECK_TIMER.scheduleAtFixedRate(readCheckerTask, initialDelayTime,readCheckTime); + } + } + } + + /** + * + */ + private synchronized void stopMonitorThreads() { + if (monitorStarted.compareAndSet(true, false)) { + readCheckerTask.cancel(); + writeCheckerTask.cancel(); + synchronized( InactivityMonitor.class ) { + WRITE_CHECK_TIMER.purge(); + READ_CHECK_TIMER.purge(); + CHECKER_COUNTER--; + if(CHECKER_COUNTER==0) { + WRITE_CHECK_TIMER.cancel(); + READ_CHECK_TIMER.cancel(); + WRITE_CHECK_TIMER = null; + READ_CHECK_TIMER = null; + } + } + } + } + + + static { + ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "InactivityMonitor Async Task: "+runnable); + thread.setDaemon(true); + return thread; + } + }); + } + +} Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/transport/InactivityMonitor.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java (added) +++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + + +/** +* A holder for different thread priorites used in ActiveMQ +* +* @version $Revision: 1.9 $ +*/ + +public interface ThreadPriorities { + int INBOUND_BROKER_CONNECTION = 6; + int OUT_BOUND_BROKER_DISPATCH = 6; + int INBOUND_CLIENT_CONNECTION = 7; + int INBOUND_CLIENT_SESSION = 7; + int BROKER_MANAGEMENT = 9; +} Propchange: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/ThreadPriorities.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/InactivityIOException.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/InactivityIOException.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/InactivityIOException.java (added) +++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/InactivityIOException.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport; + +import java.io.IOException; + +/** + * This is exception is thrown when the transport layer detects that the underlying socket has been inactive for + * too long. + * + * @version $Revision$ + */ +public class InactivityIOException extends IOException { + + /** + * + */ + private static final long serialVersionUID = 5816001466763503220L; + + public InactivityIOException() { + super(); + } + + /** + * @param message + */ + public InactivityIOException(String message) { + super(message); + } + + +} Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java (added) +++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerSupport.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport; + +import java.net.URI; + +import org.apache.activemq.util.ServiceSupport; + +/** + * A useful base class for implementations of {@link TransportServer} + * + * @version $Revision: 1.1 $ + */ +public abstract class TransportServerSupport extends ServiceSupport implements TransportServer { + + private URI connectURI; + private URI bindLocation; + private TransportAcceptListener acceptListener; + + public TransportServerSupport() { + } + + public TransportServerSupport(URI location) { + this.connectURI = location; + this.bindLocation = location; + } + + /** + * @return Returns the acceptListener. + */ + public TransportAcceptListener getAcceptListener() { + return acceptListener; + } + + /** + * Registers an accept listener + * + * @param acceptListener + */ + public void setAcceptListener(TransportAcceptListener acceptListener) { + this.acceptListener = acceptListener; + } + + /** + * @return Returns the location. + */ + public URI getConnectURI() { + return connectURI; + } + + /** + * @param location The location to set. + */ + public void setConnectURI(URI location) { + this.connectURI = location; + } + + protected void onAcceptError(Exception e) { + if (acceptListener != null) { + acceptListener.onAcceptError(e); + } + } + + public URI getBindLocation() { + return bindLocation; + } + + public void setBindLocation(URI bindLocation) { + this.bindLocation = bindLocation; + } +} Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java (added) +++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport; + +import java.net.URI; + +import org.apache.activemq.ThreadPriorities; +import org.apache.activemq.util.ServiceStopper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A useful base class for implementations of {@link TransportServer} which uses + * a background thread to accept new connections. + * + * @version $Revision: 1.1 $ + */ +public abstract class TransportServerThreadSupport extends TransportServerSupport implements Runnable { + private static final Log LOG = LogFactory.getLog(TransportServerThreadSupport.class); + + private boolean daemon = true; + private boolean joinOnStop = true; + private Thread runner; + // should be a multiple of 128k + private long stackSize; + + public TransportServerThreadSupport() { + } + + public TransportServerThreadSupport(URI location) { + super(location); + } + + public boolean isDaemon() { + return daemon; + } + + /** + * Sets whether the background read thread is a daemon thread or not + */ + public void setDaemon(boolean daemon) { + this.daemon = daemon; + } + + public boolean isJoinOnStop() { + return joinOnStop; + } + + /** + * Sets whether the background read thread is joined with (waited for) on a + * stop + */ + public void setJoinOnStop(boolean joinOnStop) { + this.joinOnStop = joinOnStop; + } + + protected void doStart() throws Exception { + LOG.info("Listening for connections at: " + getConnectURI()); + runner = new Thread(null, this, "ActiveMQ Transport Server: " + toString(), stackSize); + runner.setDaemon(daemon); + runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT); + runner.start(); + } + + protected void doStop(ServiceStopper stopper) throws Exception { + if (runner != null && joinOnStop) { + runner.join(); + runner = null; + } + } + + /** + * @return the stackSize + */ + public long getStackSize() { + return this.stackSize; + } + + /** + * @param stackSize the stackSize to set + */ + public void setStackSize(long stackSize) { + this.stackSize = stackSize; + } +} Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java (added) +++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportSupport.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport; + +import java.io.IOException; +import java.net.URI; + +import org.apache.activemq.util.ServiceSupport; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A useful base class for transport implementations. + * + * @version $Revision: 1.1 $ + */ +public abstract class TransportSupport extends ServiceSupport implements Transport { + private static final Log LOG = LogFactory.getLog(TransportSupport.class); + + TransportListener transportListener; + + /** + * Returns the current transport listener + */ + public TransportListener getTransportListener() { + return transportListener; + } + + /** + * Registers an inbound command listener + * + * @param commandListener + */ + public void setTransportListener(TransportListener commandListener) { + this.transportListener = commandListener; + } + + /** + * narrow acceptance + * + * @param target + * @return 'this' if assignable + */ + public T narrow(Class target) { + boolean assignableFrom = target.isAssignableFrom(getClass()); + if (assignableFrom) { + return target.cast(this); + } + return null; + } + + public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { + throw new AssertionError("Unsupported Method"); + } + + public Object request(Object command) throws IOException { + throw new AssertionError("Unsupported Method"); + } + + public Object request(Object command, int timeout) throws IOException { + throw new AssertionError("Unsupported Method"); + } + + /** + * Process the inbound command + */ + public void doConsume(Object command) { + if (command != null) { + if (transportListener != null) { + transportListener.onCommand(command); + } else { + LOG.error("No transportListener available to process inbound command: " + command); + } + } + } + + /** + * Passes any IO exceptions into the transport listener + */ + public void onException(IOException e) { + if (transportListener != null) { + transportListener.onException(e); + } + } + + protected void checkStarted() throws IOException { + if (!isStarted()) { + throw new IOException("The transport is not running."); + } + } + + public boolean isFaultTolerant() { + return false; + } + + + public void reconnect(URI uri) throws IOException { + throw new IOException("Not supported"); + } + + public boolean isDisposed() { + return isStopped(); + } + + public boolean isConnected() { + return isStarted(); + } + +} Added: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java (added) +++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport; + +/** + * A useful base class for a transport implementation which has a background + * reading thread. + * + * @version $Revision: 1.1 $ + */ +public abstract class TransportThreadSupport extends TransportSupport implements Runnable { + + private boolean daemon; + private Thread runner; + // should be a multiple of 128k + private long stackSize; + + public boolean isDaemon() { + return daemon; + } + + public void setDaemon(boolean daemon) { + this.daemon = daemon; + } + + protected void doStart() throws Exception { + runner = new Thread(null, this, "ActiveMQ Transport: " + toString(), stackSize); + runner.setDaemon(daemon); + runner.start(); + } + + /** + * @return the stackSize + */ + public long getStackSize() { + return this.stackSize; + } + + /** + * @param stackSize the stackSize to set + */ + public void setStackSize(long stackSize) { + this.stackSize = stackSize; + } +} Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArray.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArray.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArray.java (added) +++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArray.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Simple BitArray to enable setting multiple boolean values efficently Used + * instead of BitSet because BitSet does not allow for efficent serialization. + * Will store up to 64 boolean values + * + * @version $Revision: 1.1.1.1 $ + */ +public class BitArray { + static final int LONG_SIZE = 64; + static final int INT_SIZE = 32; + static final int SHORT_SIZE = 16; + static final int BYTE_SIZE = 8; + private static final long[] BIT_VALUES = {0x0000000000000001L, 0x0000000000000002L, 0x0000000000000004L, + 0x0000000000000008L, 0x0000000000000010L, 0x0000000000000020L, + 0x0000000000000040L, 0x0000000000000080L, 0x0000000000000100L, + 0x0000000000000200L, 0x0000000000000400L, 0x0000000000000800L, + 0x0000000000001000L, 0x0000000000002000L, 0x0000000000004000L, + 0x0000000000008000L, 0x0000000000010000L, 0x0000000000020000L, + 0x0000000000040000L, 0x0000000000080000L, 0x0000000000100000L, + 0x0000000000200000L, 0x0000000000400000L, 0x0000000000800000L, + 0x0000000001000000L, 0x0000000002000000L, 0x0000000004000000L, + 0x0000000008000000L, 0x0000000010000000L, 0x0000000020000000L, + 0x0000000040000000L, 0x0000000080000000L, 0x0000000100000000L, + 0x0000000200000000L, 0x0000000400000000L, 0x0000000800000000L, + 0x0000001000000000L, 0x0000002000000000L, 0x0000004000000000L, + 0x0000008000000000L, 0x0000010000000000L, 0x0000020000000000L, + 0x0000040000000000L, 0x0000080000000000L, 0x0000100000000000L, + 0x0000200000000000L, 0x0000400000000000L, 0x0000800000000000L, + 0x0001000000000000L, 0x0002000000000000L, 0x0004000000000000L, + 0x0008000000000000L, 0x0010000000000000L, 0x0020000000000000L, + 0x0040000000000000L, 0x0080000000000000L, 0x0100000000000000L, + 0x0200000000000000L, 0x0400000000000000L, 0x0800000000000000L, + 0x1000000000000000L, 0x2000000000000000L, 0x4000000000000000L, + 0x8000000000000000L}; + private long bits; + private int length; + + /** + * @return the length of bits set + */ + public int length() { + return length; + } + + /** + * @return the long containing the bits + */ + public long getBits() { + return bits; + } + + /** + * set the boolean value at the index + * + * @param index + * @param flag + * @return the old value held at this index + */ + public boolean set(int index, boolean flag) { + length = Math.max(length, index + 1); + boolean oldValue = (bits & BIT_VALUES[index]) != 0; + if (flag) { + bits |= BIT_VALUES[index]; + } else if (oldValue) { + bits &= ~(BIT_VALUES[index]); + } + return oldValue; + } + + /** + * @param index + * @return the boolean value at this index + */ + public boolean get(int index) { + return (bits & BIT_VALUES[index]) != 0; + } + + /** + * reset all the bit values to false + */ + public void reset() { + bits = 0; + } + + /** + * reset all the bits to the value supplied + * + * @param bits + */ + public void reset(long bits) { + this.bits = bits; + } + + /** + * write the bits to an output stream + * + * @param dataOut + * @throws IOException + */ + public void writeToStream(DataOutput dataOut) throws IOException { + dataOut.writeByte(length); + if (length <= BYTE_SIZE) { + dataOut.writeByte((int)bits); + } else if (length <= SHORT_SIZE) { + dataOut.writeShort((short)bits); + } else if (length <= INT_SIZE) { + dataOut.writeInt((int)bits); + } else { + dataOut.writeLong(bits); + } + } + + /** + * read the bits from an input stream + * + * @param dataIn + * @throws IOException + */ + public void readFromStream(DataInput dataIn) throws IOException { + length = dataIn.readByte(); + if (length <= BYTE_SIZE) { + bits = dataIn.readByte(); + } else if (length <= SHORT_SIZE) { + bits = dataIn.readShort(); + } else if (length <= INT_SIZE) { + bits = dataIn.readInt(); + } else { + bits = dataIn.readLong(); + } + } +} Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArray.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArrayBin.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArrayBin.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArrayBin.java (added) +++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArrayBin.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import java.util.LinkedList; + +/** + * Holder for many bitArrays - used for message audit + * + * @version $Revision: 1.1.1.1 $ + */ +public class BitArrayBin { + + private LinkedList list; + private int maxNumberOfArrays; + private int firstIndex = -1; + private long lastInOrderBit=-1; + + /** + * Create a BitArrayBin to a certain window size (number of messages to + * keep) + * + * @param windowSize + */ + public BitArrayBin(int windowSize) { + maxNumberOfArrays = ((windowSize + 1) / BitArray.LONG_SIZE) + 1; + maxNumberOfArrays = Math.max(maxNumberOfArrays, 1); + list = new LinkedList(); + for (int i = 0; i < maxNumberOfArrays; i++) { + list.add(null); + } + } + + /** + * Set a bit + * + * @param index + * @param value + * @return true if set + */ + public boolean setBit(long index, boolean value) { + boolean answer = false; + BitArray ba = getBitArray(index); + if (ba != null) { + int offset = getOffset(index); + if (offset >= 0) { + answer = ba.set(offset, value); + } + } + return answer; + } + + /** + * Test if in order + * @param index + * @return true if next message is in order + */ + public boolean isInOrder(long index) { + boolean result = false; + if (lastInOrderBit == -1) { + result = true; + } else { + result = lastInOrderBit + 1 == index; + } + lastInOrderBit = index; + return result; + + } + + /** + * Get the boolean value at the index + * + * @param index + * @return true/false + */ + public boolean getBit(long index) { + boolean answer = index >= firstIndex; + BitArray ba = getBitArray(index); + if (ba != null) { + int offset = getOffset(index); + if (offset >= 0) { + answer = ba.get(offset); + return answer; + } + } else { + // gone passed range for previous bins so assume set + answer = true; + } + return answer; + } + + /** + * Get the BitArray for the index + * + * @param index + * @return BitArray + */ + private BitArray getBitArray(long index) { + int bin = getBin(index); + BitArray answer = null; + if (bin >= 0) { + if (bin >= maxNumberOfArrays) { + int overShoot = bin - maxNumberOfArrays + 1; + while (overShoot > 0) { + list.removeFirst(); + firstIndex += BitArray.LONG_SIZE; + list.add(new BitArray()); + overShoot--; + } + + bin = maxNumberOfArrays - 1; + } + answer = list.get(bin); + if (answer == null) { + answer = new BitArray(); + list.set(bin, answer); + } + } + return answer; + } + + /** + * Get the index of the bin from the total index + * + * @param index + * @return the index of the bin + */ + private int getBin(long index) { + int answer = 0; + if (firstIndex < 0) { + firstIndex = (int) (index - (index % BitArray.LONG_SIZE)); + } else if (firstIndex >= 0) { + answer = (int)((index - firstIndex) / BitArray.LONG_SIZE); + } + return answer; + } + + /** + * Get the offset into a bin from the total index + * + * @param index + * @return the relative offset into a bin + */ + private int getOffset(long index) { + int answer = 0; + if (firstIndex >= 0) { + answer = (int)((index - firstIndex) - (BitArray.LONG_SIZE * getBin(index))); + } + return answer; + } +} Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/BitArrayBin.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IndentPrinter.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IndentPrinter.java?rev=781177&view=auto ============================================================================== --- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IndentPrinter.java (added) +++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IndentPrinter.java Tue Jun 2 21:29:30 2009 @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import java.io.PrintWriter; + +/** + * A helper class for printing indented text + * + * @version $Revision: 1.2 $ + */ +public class IndentPrinter { + + private int indentLevel; + private String indent; + private PrintWriter out; + + public IndentPrinter() { + this(new PrintWriter(System.out), " "); + } + + public IndentPrinter(PrintWriter out) { + this(out, " "); + } + + public IndentPrinter(PrintWriter out, String indent) { + this.out = out; + this.indent = indent; + } + + public void println(Object value) { + out.print(value.toString()); + out.println(); + } + + public void println(String text) { + out.print(text); + out.println(); + } + + public void print(String text) { + out.print(text); + } + + public void printIndent() { + for (int i = 0; i < indentLevel; i++) { + out.print(indent); + } + } + + public void println() { + out.println(); + } + + public void incrementIndent() { + ++indentLevel; + } + + public void decrementIndent() { + --indentLevel; + } + + public int getIndentLevel() { + return indentLevel; + } + + public void setIndentLevel(int indentLevel) { + this.indentLevel = indentLevel; + } + + public void flush() { + out.flush(); + } +} Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/IndentPrinter.java ------------------------------------------------------------------------------ svn:executable = * Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java?rev=781177&r1=781176&r2=781177&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java (original) +++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/JMSExceptionSupport.java Tue Jun 2 21:29:30 2009 @@ -16,6 +16,7 @@ */ package org.apache.activemq.util; +import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.MessageEOFException; import javax.jms.MessageFormatException; @@ -86,4 +87,11 @@ exception.initCause(cause); return exception; } + + public static InvalidSelectorException createInvalidSelectorException(Exception e) { + InvalidSelectorException se = new InvalidSelectorException(e.getMessage()); + se.initCause(e); + return se; + } + }