Return-Path: Delivered-To: apmail-geronimo-scm-archive@www.apache.org Received: (qmail 51161 invoked from network); 17 Sep 2006 12:36:32 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 17 Sep 2006 12:36:32 -0000 Received: (qmail 50989 invoked by uid 500); 17 Sep 2006 12:36:30 -0000 Delivered-To: apmail-geronimo-scm-archive@geronimo.apache.org Received: (qmail 50948 invoked by uid 500); 17 Sep 2006 12:36:30 -0000 Mailing-List: contact scm-help@geronimo.apache.org; run by ezmlm Precedence: bulk list-help: list-unsubscribe: List-Post: Reply-To: dev@geronimo.apache.org List-Id: Delivered-To: mailing list scm@geronimo.apache.org Received: (qmail 50895 invoked by uid 99); 17 Sep 2006 12:36:30 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 17 Sep 2006 05:36:30 -0700 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 17 Sep 2006 05:36:16 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id C412A1A981D; Sun, 17 Sep 2006 05:35:49 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r447058 [2/4] - in /geronimo/sandbox/gcache/openwire: ./ src/main/java/org/apache/geronimo/openwire/command/ src/main/java/org/apache/geronimo/openwire/state/ src/main/java/org/apache/geronimo/openwire/thread/ src/main/java/org/apache/geron... Date: Sun, 17 Sep 2006 12:35:47 -0000 To: scm@geronimo.apache.org From: jgenender@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20060917123549.C412A1A981D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/ProducerInfo.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/ProducerInfo.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/ProducerInfo.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/RemoveInfo.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/RemoveInfo.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/RemoveInfo.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/RemoveInfo.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,100 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.openwire.command; + +import java.io.IOException; + +import org.apache.geronimo.openwire.state.CommandVisitor; + +/** + * Removes a consumer, producer, session or connection. + * + * @openwire:marshaller code="12" + * @version $Revision$ + */ +public class RemoveInfo extends BaseCommand { + + public static final byte DATA_STRUCTURE_TYPE=CommandTypes.REMOVE_INFO; + + protected DataStructure objectId; + + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + public RemoveInfo() { + } + public RemoveInfo(DataStructure objectId) { + this.objectId=objectId; + } + + /** + * @openwire:property version=1 cache=true + */ + public DataStructure getObjectId() { + return objectId; + } + + public void setObjectId(DataStructure objectId) { + this.objectId = objectId; + } + + public Response visit(CommandVisitor visitor) throws Exception { + switch (objectId.getDataStructureType()) { + case ConnectionId.DATA_STRUCTURE_TYPE: + return visitor.processRemoveConnection((ConnectionId) objectId); + case SessionId.DATA_STRUCTURE_TYPE: + return visitor.processRemoveSession((SessionId) objectId); + case ConsumerId.DATA_STRUCTURE_TYPE: + return visitor.processRemoveConsumer((ConsumerId) objectId); + case ProducerId.DATA_STRUCTURE_TYPE: + return visitor.processRemoveProducer((ProducerId) objectId); + default: + throw new IOException("Unknown remove command type: "+ objectId.getDataStructureType()); + } + } + + /** + * Returns true if this event is for a removed connection + */ + public boolean isConnectionRemove() { + return objectId.getDataStructureType() == ConnectionId.DATA_STRUCTURE_TYPE; + } + + /** + * Returns true if this event is for a removed session + */ + public boolean isSessionRemove() { + return objectId.getDataStructureType() == SessionId.DATA_STRUCTURE_TYPE; + } + + /** + * Returns true if this event is for a removed consumer + */ + public boolean isConsumerRemove() { + return objectId.getDataStructureType() == ConsumerId.DATA_STRUCTURE_TYPE; + } + + /** + * Returns true if this event is for a removed producer + */ + public boolean isProducerRemove() { + return objectId.getDataStructureType() == ProducerId.DATA_STRUCTURE_TYPE; + } + +} Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/RemoveInfo.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/RemoveInfo.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/RemoveInfo.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionId.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionId.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionId.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionId.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,117 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.openwire.command; + +/** + * + * @openwire:marshaller code="121" + * @version $Revision$ + */ +public class SessionId implements DataStructure { + + public static final byte DATA_STRUCTURE_TYPE=CommandTypes.SESSION_ID; + + protected String connectionId; + protected long value; + + protected transient int hashCode; + protected transient String key; + protected transient ConnectionId parentId; + + public SessionId() { + } + + public SessionId(ConnectionId connectionId, long sessionId) { + this.connectionId = connectionId.getValue(); + this.value=sessionId; + } + + public SessionId(SessionId id) { + this.connectionId = id.getConnectionId(); + this.value=id.getValue(); + } + + public SessionId(ProducerId id) { + this.connectionId = id.getConnectionId(); + this.value=id.getSessionId(); + } + + public SessionId(ConsumerId id) { + this.connectionId = id.getConnectionId(); + this.value=id.getSessionId(); + } + + public ConnectionId getParentId() { + if( parentId == null ) { + parentId = new ConnectionId(this); + } + return parentId; + } + + public int hashCode() { + if( hashCode == 0 ) { + hashCode = connectionId.hashCode() ^ (int)value; + } + return hashCode; + } + + public boolean equals(Object o) { + if( this == o ) + return true; + if( o == null || o.getClass()!=SessionId.class ) + return false; + SessionId id = (SessionId) o; + return value==id.value + && connectionId.equals(id.connectionId); + } + + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + /** + * @openwire:property version=1 cache=true + */ + public String getConnectionId() { + return connectionId; + } + public void setConnectionId(String connectionId) { + this.connectionId = connectionId; + } + + /** + * @openwire:property version=1 + */ + public long getValue() { + return value; + } + public void setValue(long sessionId) { + this.value = sessionId; + } + + public String toString() { + if( key==null ) { + key = connectionId+":"+value; + } + return key; + } + + public boolean isMarshallAware() { + return false; + } +} Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionId.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionId.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionId.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionInfo.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionInfo.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionInfo.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionInfo.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,69 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.openwire.command; + +import org.apache.geronimo.openwire.state.CommandVisitor; + +/** + * + * @openwire:marshaller code="4" + * @version $Revision$ + */ +public class SessionInfo extends BaseCommand { + + public static final byte DATA_STRUCTURE_TYPE=CommandTypes.SESSION_INFO; + + protected SessionId sessionId; + + public SessionInfo() { + sessionId = new SessionId(); + } + + public SessionInfo(ConnectionInfo connectionInfo, long sessionId) { + this.sessionId = new SessionId(connectionInfo.getConnectionId(), sessionId); + } + + public SessionInfo(SessionId sessionId) { + this.sessionId = sessionId; + } + + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + /** + * @openwire:property version=1 cache=true + */ + public SessionId getSessionId() { + return sessionId; + } + public void setSessionId(SessionId sessionId) { + this.sessionId = sessionId; + } + +// public RemoveInfo createRemoveCommand() { +// RemoveInfo command = new RemoveInfo(getSessionId()); +// command.setResponseRequired(isResponseRequired()); +// return command; +// } + + public Response visit(CommandVisitor visitor) throws Exception { + return visitor.processAddSession( this); + } + +} Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionInfo.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionInfo.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/SessionInfo.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/TransactionId.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/TransactionId.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/TransactionId.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/TransactionId.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,33 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.openwire.command; + +/** + * @openwire:marshaller + * @version $Revision$ + */ +abstract public class TransactionId implements DataStructure { + + abstract public boolean isXATransaction(); + abstract public boolean isLocalTransaction(); + abstract public String getTransactionKey(); + + public boolean isMarshallAware() { + return false; + } +} Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/TransactionId.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/TransactionId.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/TransactionId.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/TransactionInfo.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/TransactionInfo.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/TransactionInfo.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/TransactionInfo.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,113 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.openwire.command; + +import java.io.IOException; + +import org.apache.geronimo.openwire.state.CommandVisitor; + +/** + * + * @openwire:marshaller code="7" + * @version $Revision$ + */ +public class TransactionInfo extends BaseCommand { + + public static final byte DATA_STRUCTURE_TYPE=CommandTypes.TRANSACTION_INFO; + + + public static final byte BEGIN = 0; + public static final byte PREPARE=1; + public static final byte COMMIT_ONE_PHASE=2; + public static final byte COMMIT_TWO_PHASE=3; + public static final byte ROLLBACK=4; + public static final byte RECOVER=5; + public static final byte FORGET=6; + public static final byte END=7; + + protected byte type; + protected ConnectionId connectionId; + protected TransactionId transactionId; + + public TransactionInfo() { + } + + public TransactionInfo(ConnectionId connectionId, TransactionId transactionId, byte type) { + this.connectionId=connectionId; + this.transactionId=transactionId; + this.type=type; + } + + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + /** + * @openwire:property version=1 cache=true + */ + public ConnectionId getConnectionId() { + return connectionId; + } + public void setConnectionId(ConnectionId connectionId) { + this.connectionId = connectionId; + } + + /** + * @openwire:property version=1 cache=true + */ + public TransactionId getTransactionId() { + return transactionId; + } + public void setTransactionId(TransactionId transactionId) { + this.transactionId = transactionId; + } + + /** + * @openwire:property version=1 + */ + public byte getType() { + return type; + } + public void setType(byte type) { + this.type = type; + } + + public Response visit(CommandVisitor visitor) throws Exception { + switch( type ) { + case TransactionInfo.BEGIN: + return visitor.processBeginTransaction(this); + case TransactionInfo.END: + return visitor.processEndTransaction(this); + case TransactionInfo.PREPARE: + return visitor.processPrepareTransaction(this); + case TransactionInfo.COMMIT_ONE_PHASE: + return visitor.processCommitTransactionOnePhase(this); + case TransactionInfo.COMMIT_TWO_PHASE: + return visitor.processCommitTransactionTwoPhase(this); + case TransactionInfo.ROLLBACK: + return visitor.processRollbackTransaction(this); + case TransactionInfo.RECOVER: + return visitor.processRecoverTransactions(this); + case TransactionInfo.FORGET: + return visitor.processForgetTransaction(this); + default: + throw new IOException("Transaction info type unknown: "+type); + } + } + +} Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/TransactionInfo.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/TransactionInfo.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/command/TransactionInfo.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/CommandVisitor.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/CommandVisitor.java?view=diff&rev=447058&r1=447057&r2=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/CommandVisitor.java (original) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/CommandVisitor.java Sun Sep 17 05:35:45 2006 @@ -1,8 +1,19 @@ package org.apache.geronimo.openwire.state; +import org.apache.geronimo.openwire.command.ConnectionId; +import org.apache.geronimo.openwire.command.ConnectionInfo; +import org.apache.geronimo.openwire.command.ConsumerId; +import org.apache.geronimo.openwire.command.ConsumerInfo; +import org.apache.geronimo.openwire.command.DestinationInfo; import org.apache.geronimo.openwire.command.KeepAliveInfo; +import org.apache.geronimo.openwire.command.MessageAck; import org.apache.geronimo.openwire.command.NodeInfo; +import org.apache.geronimo.openwire.command.ProducerId; +import org.apache.geronimo.openwire.command.ProducerInfo; import org.apache.geronimo.openwire.command.Response; +import org.apache.geronimo.openwire.command.SessionId; +import org.apache.geronimo.openwire.command.SessionInfo; +import org.apache.geronimo.openwire.command.TransactionInfo; import org.apache.geronimo.openwire.command.WireFormatInfo; /** @@ -35,6 +46,25 @@ Response processWireFormat(WireFormatInfo info) throws Exception; Response processKeepAlive(KeepAliveInfo info) throws Exception; Response processNodeInfo(NodeInfo info) throws Exception; + Response processAddConnection(ConnectionInfo info) throws Exception; + Response processAddDestination(DestinationInfo info) throws Exception; + Response processRemoveDestination(DestinationInfo info) throws Exception; + Response processAddSession(SessionInfo info) throws Exception; + Response processBeginTransaction(TransactionInfo info) throws Exception; + Response processEndTransaction(TransactionInfo info) throws Exception; + Response processPrepareTransaction(TransactionInfo info) throws Exception; + Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception; + Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception; + Response processRollbackTransaction(TransactionInfo info) throws Exception; + Response processRecoverTransactions(TransactionInfo info) throws Exception; + Response processForgetTransaction(TransactionInfo info) throws Exception; + Response processAddProducer(ProducerInfo info) throws Exception; + Response processRemoveConnection(ConnectionId id) throws Exception; + Response processRemoveSession(SessionId id) throws Exception; + Response processRemoveConsumer(ConsumerId id) throws Exception; + Response processRemoveProducer(ProducerId id) throws Exception; + Response processAddConsumer(ConsumerInfo info) throws Exception; + Response processMessageAck(MessageAck ack) throws Exception; } Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConnectionState.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConnectionState.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConnectionState.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConnectionState.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,114 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.geronimo.openwire.state; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.geronimo.openwire.command.ConnectionInfo; +import org.apache.geronimo.openwire.command.DestinationInfo; +import org.apache.geronimo.openwire.command.SessionId; +import org.apache.geronimo.openwire.command.SessionInfo; +import org.apache.geronimo.openwire.command.TransactionId; + +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; + +public class ConnectionState { + + final ConnectionInfo info; + private final ConcurrentHashMap transactions = new ConcurrentHashMap(); + private final ConcurrentHashMap sessions = new ConcurrentHashMap(); + private final List tempDestinations = Collections.synchronizedList(new ArrayList()); + private final AtomicBoolean shutdown = new AtomicBoolean(false); + + public ConnectionState(ConnectionInfo info) { + this.info = info; + // Add the default session id. + addSession(new SessionInfo(info, -1)); + } + + public String toString() { + return info.toString(); + } + + public void addTempDestination(DestinationInfo info) { + checkShutdown(); + tempDestinations.add(info); + } + + public void addTransactionState(TransactionId id) { + checkShutdown(); + transactions.put(id, new TransactionState(id)); + } + public TransactionState getTransactionState(TransactionId id) { + return (TransactionState)transactions.get(id); + } + public Collection getTransactionStates() { + return transactions.values(); + } + public TransactionState removeTransactionState(TransactionId id) { + return (TransactionState) transactions.remove(id); + } + + public void addSession(SessionInfo info) { + checkShutdown(); + sessions.put(info.getSessionId(), new SessionState(info)); + } + public SessionState removeSession(SessionId id) { + return (SessionState)sessions.remove(id); + } + public SessionState getSessionState(SessionId id) { + return (SessionState)sessions.get(id); + } + + public ConnectionInfo getInfo() { + return info; + } + + public Set getSessionIds() { + return sessions.keySet(); + } + + public List getTempDesinations() { + return tempDestinations; + } + + public Collection getSessionStates() { + return sessions.values(); + } + + private void checkShutdown() { + if( shutdown.get() ) + throw new IllegalStateException("Disposed"); + } + + public void shutdown() { + if( shutdown.compareAndSet(false, true) ) { + for (Iterator iter = sessions.values().iterator(); iter.hasNext();) { + SessionState ss = (SessionState) iter.next(); + ss.shutdown(); + } + } + } +} \ No newline at end of file Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConnectionState.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConnectionState.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConnectionState.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConnectionStateTracker.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConnectionStateTracker.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConnectionStateTracker.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConnectionStateTracker.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,421 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.openwire.state; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.geronimo.openwire.command.Command; +import org.apache.geronimo.openwire.command.ConnectionId; +import org.apache.geronimo.openwire.command.ConnectionInfo; +import org.apache.geronimo.openwire.command.ConsumerId; +import org.apache.geronimo.openwire.command.ConsumerInfo; +import org.apache.geronimo.openwire.command.DestinationInfo; +import org.apache.geronimo.openwire.command.KeepAliveInfo; +import org.apache.geronimo.openwire.command.Message; +import org.apache.geronimo.openwire.command.MessageAck; +import org.apache.geronimo.openwire.command.NodeInfo; +import org.apache.geronimo.openwire.command.ProducerId; +import org.apache.geronimo.openwire.command.ProducerInfo; +import org.apache.geronimo.openwire.command.Response; +import org.apache.geronimo.openwire.command.SessionId; +import org.apache.geronimo.openwire.command.SessionInfo; +import org.apache.geronimo.openwire.command.TransactionInfo; +import org.apache.geronimo.openwire.command.WireFormatInfo; +import org.apache.geronimo.openwire.transport.Transport; +import org.apache.geronimo.openwire.util.IOExceptionSupport; + +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; + +/** + * Tracks the state of a connection so a newly established transport can + * be re-initialized to the state that was tracked. + * + * @version $Revision$ + */ +public class ConnectionStateTracker implements CommandVisitor { + + private final static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); + + private boolean trackTransactions = false; + + private boolean restoreSessions=true; + private boolean restoreConsumers=true; + private boolean restoreProducers=true; + private boolean restoreTransaction=true; + + protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap(); + + private class RemoveTransactionAction implements Runnable { + private final TransactionInfo info; + public RemoveTransactionAction(TransactionInfo info) { + this.info = info; + } + public void run() { + ConnectionId connectionId = info.getConnectionId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + cs.removeTransactionState(info.getTransactionId()); + } + } + + /** + * + * + * @param command + * @return null if the command is not state tracked. + * @throws IOException + */ + public Tracked track(Command command) throws IOException { + try { + return (Tracked) command.visit(this); + } catch (IOException e) { + throw e; + } catch (Throwable e) { + throw IOExceptionSupport.create(e); + } + } + + public void restore( Transport transport ) throws IOException { + // Restore the connections. + for (Iterator iter = connectionStates.values().iterator(); iter.hasNext();) { + ConnectionState connectionState = (ConnectionState) iter.next(); + transport.oneway(connectionState.getInfo()); + restoreTempDestinations(transport, connectionState); + + if( restoreSessions ) + restoreSessions(transport, connectionState); + + if( restoreTransaction ) + restoreTransactions(transport, connectionState); + } + } + + private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException { + for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) { + TransactionState transactionState = (TransactionState) iter.next(); + for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) { + Command command = (Command) iterator.next(); + transport.oneway(command); + } + } + } + + /** + * @param transport + * @param connectionState + * @throws IOException + */ + protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException { + // Restore the connection's sessions + for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) { + SessionState sessionState = (SessionState) iter2.next(); + transport.oneway(sessionState.getInfo()); + + if( restoreProducers ) + restoreProducers(transport, sessionState); + + if( restoreConsumers ) + restoreConsumers(transport, sessionState); + } + } + + /** + * @param transport + * @param sessionState + * @throws IOException + */ + protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException { + // Restore the session's consumers + for (Iterator iter3 = sessionState.getConsumerStates().iterator(); iter3.hasNext();) { + ConsumerState consumerState = (ConsumerState) iter3.next(); + transport.oneway(consumerState.getInfo()); + } + } + + /** + * @param transport + * @param sessionState + * @throws IOException + */ + protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException { + // Restore the session's producers + for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) { + ProducerState producerState = (ProducerState) iter3.next(); + transport.oneway(producerState.getInfo()); + } + } + + /** + * @param transport + * @param connectionState + * @throws IOException + */ + protected void restoreTempDestinations(Transport transport, ConnectionState connectionState) throws IOException { + // Restore the connection's temp destinations. + for (Iterator iter2 = connectionState.getTempDesinations().iterator(); iter2.hasNext();) { + transport.oneway((DestinationInfo) iter2.next()); + } + } + + public Response processAddDestination(DestinationInfo info) throws Exception { + ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId()); +// if( info.getDestination().isTemporary() ) { +// cs.addTempDestination(info); +// } + return TRACKED_RESPONSE_MARKER; + } + + public Response processRemoveDestination(DestinationInfo info) throws Exception { + ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId()); +// if( info.getDestination().isTemporary() ) { +// cs.removeTempDestination(info.getDestination()); +// } + return TRACKED_RESPONSE_MARKER; + } + + + public Response processAddProducer(ProducerInfo info) throws Exception { + SessionId sessionId = info.getProducerId().getParentId(); + ConnectionId connectionId = sessionId.getParentId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + SessionState ss = cs.getSessionState(sessionId); + ss.addProducer(info); + return TRACKED_RESPONSE_MARKER; + } + + public Response processRemoveProducer(ProducerId id) throws Exception { + SessionId sessionId = id.getParentId(); + ConnectionId connectionId = sessionId.getParentId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + SessionState ss = cs.getSessionState(sessionId); + ss.removeProducer(id); + return TRACKED_RESPONSE_MARKER; + } + + public Response processAddConsumer(ConsumerInfo info) throws Exception { + SessionId sessionId = info.getConsumerId().getParentId(); + ConnectionId connectionId = sessionId.getParentId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + SessionState ss = cs.getSessionState(sessionId); + ss.addConsumer(info); + return TRACKED_RESPONSE_MARKER; + } + + public Response processRemoveConsumer(ConsumerId id) throws Exception { + SessionId sessionId = id.getParentId(); + ConnectionId connectionId = sessionId.getParentId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + SessionState ss = cs.getSessionState(sessionId); + ss.removeConsumer(id); + return TRACKED_RESPONSE_MARKER; + } + + public Response processAddSession(SessionInfo info) throws Exception { + ConnectionId connectionId = info.getSessionId().getParentId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + cs.addSession(info); + return TRACKED_RESPONSE_MARKER; + } + + public Response processRemoveSession(SessionId id) throws Exception { + ConnectionId connectionId = id.getParentId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + cs.removeSession(id); + return TRACKED_RESPONSE_MARKER; + } + + public Response processAddConnection(ConnectionInfo info) throws Exception { + connectionStates.put(info.getConnectionId(), new ConnectionState(info)); + return TRACKED_RESPONSE_MARKER; + } + + public Response processRemoveConnection(ConnectionId id) throws Exception { + connectionStates.remove(id); + return TRACKED_RESPONSE_MARKER; + } + +// public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { +// return null; +// } + + public Response processMessage(Message send) throws Exception { + if( trackTransactions && send.getTransactionId() != null ) { + ConnectionId connectionId = send.getProducerId().getParentId().getParentId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + TransactionState transactionState = cs.getTransactionState(send.getTransactionId()); + transactionState.addCommand(send); + return TRACKED_RESPONSE_MARKER; + } + return null; + } + public Response processMessageAck(MessageAck ack) throws Exception { + if( trackTransactions && ack.getTransactionId() != null ) { + ConnectionId connectionId = ack.getConsumerId().getParentId().getParentId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + TransactionState transactionState = cs.getTransactionState(ack.getTransactionId()); + transactionState.addCommand(ack); + return TRACKED_RESPONSE_MARKER; + } + return null; + } + + public Response processBeginTransaction(TransactionInfo info) throws Exception { + if( trackTransactions ) { + ConnectionId connectionId = info.getConnectionId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + cs.addTransactionState(info.getTransactionId()); + return TRACKED_RESPONSE_MARKER; + } + return null; + } + public Response processPrepareTransaction(TransactionInfo info) throws Exception { + if( trackTransactions ) { + ConnectionId connectionId = info.getConnectionId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); + transactionState.addCommand(info); + return TRACKED_RESPONSE_MARKER; + } + return null; + } + + public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { + if( trackTransactions ) { + ConnectionId connectionId = info.getConnectionId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); + if( transactionState !=null ) { + transactionState.addCommand(info); + return new Tracked(new RemoveTransactionAction(info)); + } + } + return null; + } + public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { + if( trackTransactions ) { + ConnectionId connectionId = info.getConnectionId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); + if( transactionState !=null ) { + transactionState.addCommand(info); + return new Tracked(new RemoveTransactionAction(info)); + } + } + return null; + } + + public Response processRollbackTransaction(TransactionInfo info) throws Exception { + if( trackTransactions ) { + ConnectionId connectionId = info.getConnectionId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); + if( transactionState !=null ) { + transactionState.addCommand(info); + return new Tracked(new RemoveTransactionAction(info)); + } + } + return null; + } + + public Response processEndTransaction(TransactionInfo info) throws Exception { + if( trackTransactions ) { + ConnectionId connectionId = info.getConnectionId(); + ConnectionState cs = (ConnectionState) connectionStates.get(connectionId); + TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); + transactionState.addCommand(info); + return TRACKED_RESPONSE_MARKER; + } + return null; + } + + public Response processRecoverTransactions(TransactionInfo info) { + return null; + } + public Response processForgetTransaction(TransactionInfo info) throws Exception { + return null; + } + + + public Response processWireFormat(WireFormatInfo info) throws Exception { + return null; + } + public Response processKeepAlive(KeepAliveInfo info) throws Exception { + return null; + } + +// public Response processShutdown(ShutdownInfo info) throws Exception { +// return null; +// } + + public Response processNodeInfo(NodeInfo info) throws Exception { + return null; + } + +// public Response processFlush(FlushCommand command) throws Exception { +// return null; +// } + +// public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception{ +// return null; +// } +// +// public Response processMessagePull(MessagePull pull) throws Exception { +// return null; +// } + + public boolean isRestoreConsumers() { + return restoreConsumers; + } + + public void setRestoreConsumers(boolean restoreConsumers) { + this.restoreConsumers = restoreConsumers; + } + + public boolean isRestoreProducers() { + return restoreProducers; + } + + public void setRestoreProducers(boolean restoreProducers) { + this.restoreProducers = restoreProducers; + } + + public boolean isRestoreSessions() { + return restoreSessions; + } + + public void setRestoreSessions(boolean restoreSessions) { + this.restoreSessions = restoreSessions; + } + + public boolean isTrackTransactions() { + return trackTransactions; + } + + public void setTrackTransactions(boolean trackTransactions) { + this.trackTransactions = trackTransactions; + } + + public boolean isRestoreTransaction() { + return restoreTransaction; + } + + public void setRestoreTransaction(boolean restoreTransaction) { + this.restoreTransaction = restoreTransaction; + } + +} Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConnectionStateTracker.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConnectionStateTracker.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConnectionStateTracker.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConsumerState.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConsumerState.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConsumerState.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConsumerState.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,35 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.geronimo.openwire.state; + +import org.apache.geronimo.openwire.command.ConsumerInfo; + +public class ConsumerState { + final ConsumerInfo info; + + public ConsumerState(ConsumerInfo info) { + this.info = info; + } + public String toString() { + return info.toString(); + } + public ConsumerInfo getInfo() { + return info; + } +} \ No newline at end of file Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConsumerState.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConsumerState.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ConsumerState.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ProducerState.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ProducerState.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ProducerState.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ProducerState.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,42 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.geronimo.openwire.state; + +import org.apache.geronimo.openwire.command.ProducerInfo; + +public class ProducerState { + final ProducerInfo info; + private long lastSequenceId=-1; + + public ProducerState(ProducerInfo info) { + this.info = info; + } + public String toString() { + return info.toString(); + } + public ProducerInfo getInfo() { + return info; + } + public void setLastSequenceId(long lastSequenceId) { + this.lastSequenceId = lastSequenceId; + } + public long getLastSequenceId() { + return lastSequenceId; + } +} \ No newline at end of file Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ProducerState.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ProducerState.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/ProducerState.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/SessionState.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/SessionState.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/SessionState.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/SessionState.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,93 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.geronimo.openwire.state; + +import java.util.Collection; +import java.util.Set; + +import org.apache.geronimo.openwire.command.ConsumerId; +import org.apache.geronimo.openwire.command.ConsumerInfo; +import org.apache.geronimo.openwire.command.ProducerId; +import org.apache.geronimo.openwire.command.ProducerInfo; +import org.apache.geronimo.openwire.command.SessionInfo; + +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; + +public class SessionState { + final SessionInfo info; + + public final ConcurrentHashMap producers = new ConcurrentHashMap(); + public final ConcurrentHashMap consumers = new ConcurrentHashMap(); + private final AtomicBoolean shutdown = new AtomicBoolean(false); + + public SessionState(SessionInfo info) { + this.info = info; + } + public String toString() { + return info.toString(); + } + + public void addProducer(ProducerInfo info) { + checkShutdown(); + producers.put(info.getProducerId(), new ProducerState(info)); + } + public ProducerState removeProducer(ProducerId id) { + return (ProducerState) producers.remove(id); + } + + public void addConsumer(ConsumerInfo info) { + checkShutdown(); + consumers.put(info.getConsumerId(), new ConsumerState(info)); + } + public ConsumerState removeConsumer(ConsumerId id) { + return (ConsumerState) consumers.remove(id); + } + + public SessionInfo getInfo() { + return info; + } + + public Set getConsumerIds() { + return consumers.keySet(); + } + public Set getProducerIds() { + return producers.keySet(); + } + public Collection getProducerStates() { + return producers.values(); + } + public ProducerState getProducerState(ProducerId producerId) { + return (ProducerState) producers.get(producerId); + } + + public Collection getConsumerStates() { + return consumers.values(); + } + + private void checkShutdown() { + if( shutdown.get() ) + throw new IllegalStateException("Disposed"); + } + + public void shutdown() { + shutdown.set(false); + } + +} \ No newline at end of file Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/SessionState.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/SessionState.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/SessionState.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/Tracked.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/Tracked.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/Tracked.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/Tracked.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.geronimo.openwire.state; + +import org.apache.geronimo.openwire.command.Response; + +public class Tracked extends Response { + + private Runnable runnable; + + public Tracked(Runnable runnable) { + this.runnable = runnable; + } + + public void onResponses() { + if( runnable != null ) { + runnable.run(); + runnable=null; + } + } + + public boolean isWaitingForResponse() { + return runnable!=null; + } + +} \ No newline at end of file Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/Tracked.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/Tracked.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/Tracked.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/TransactionState.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/TransactionState.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/TransactionState.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/TransactionState.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.geronimo.openwire.state; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.geronimo.openwire.command.Command; +import org.apache.geronimo.openwire.command.TransactionId; + +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; + +public class TransactionState { + final TransactionId id; + + public final ArrayList commands = new ArrayList(); + private final AtomicBoolean shutdown = new AtomicBoolean(false); + + private boolean prepared; + + private int preparedResult; + + public TransactionState(TransactionId id) { + this.id = id; + } + public String toString() { + return id.toString(); + } + + public void addCommand(Command operation) { + checkShutdown(); + commands.add(operation); + } + + public List getCommands() { + return commands; + } + + private void checkShutdown() { + if( shutdown.get() ) + throw new IllegalStateException("Disposed"); + } + + public void shutdown() { + shutdown.set(false); + } + public TransactionId getId() { + return id; + } + + public void setPrepared(boolean prepared) { + this.prepared = prepared; + } + public boolean isPrepared() { + return prepared; + } + public void setPreparedResult(int preparedResult) { + this.preparedResult = preparedResult; + } + public int getPreparedResult() { + return preparedResult; + } + +} \ No newline at end of file Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/TransactionState.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/TransactionState.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/state/TransactionState.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/DedicatedTaskRunner.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/DedicatedTaskRunner.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/DedicatedTaskRunner.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/DedicatedTaskRunner.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,111 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.openwire.thread; + + +/** + * + * @version $Revision$ + */ +class DedicatedTaskRunner implements TaskRunner { + + private final Task task; + private final Thread thread; + + private final Object mutex = new Object(); + private boolean threadTerminated; + private boolean pending; + private boolean shutdown; + + public DedicatedTaskRunner(Task task, String name, int priority, boolean daemon) { + this.task = task; + thread = new Thread(name) { + public void run() { + runTask(); + } + }; + thread.setDaemon(daemon); + thread.setName(name); + thread.setPriority(priority); + thread.start(); + } + + /** + */ + public void wakeup() throws InterruptedException { + synchronized( mutex ) { + if( shutdown ) + return; + pending=true; + mutex.notifyAll(); + } + } + + /** + * shut down the task + * @throws InterruptedException + */ + public void shutdown() throws InterruptedException{ + synchronized(mutex){ + shutdown=true; + pending=true; + mutex.notifyAll(); + + // Wait till the thread stops. + if(!threadTerminated){ + mutex.wait(); + } + } + } + + private void runTask() { + + try { + while( true ) { + + synchronized (mutex) { + pending=false; + if( shutdown ) { + return; + } + } + + if( !task.iterate() ) { + // wait to be notified. + synchronized (mutex) { + while( !pending ) { + mutex.wait(); + } + } + } + + } + + } catch (InterruptedException e) { + // Someone really wants this thread to die off. + Thread.currentThread().interrupt(); + } finally { + // Make sure we notify any waiting threads that thread + // has terminated. + synchronized (mutex) { + threadTerminated=true; + mutex.notifyAll(); + } + } + } +} Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/DedicatedTaskRunner.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/DedicatedTaskRunner.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/DedicatedTaskRunner.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/DefaultThreadPools.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/DefaultThreadPools.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/DefaultThreadPools.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/DefaultThreadPools.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,51 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.openwire.thread; + +import edu.emory.mathcs.backport.java.util.concurrent.Executor; +import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor; +import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory; + +/** + * + * @version $Revision$ + */ +public class DefaultThreadPools { + + private static final Executor defaultPool; + static { + defaultPool = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "ActiveMQ Default Thread Pool Thread"); + thread.setDaemon(true); + return thread; + } + }); + } + + private static final TaskRunnerFactory defaultTaskRunnerFactory = new TaskRunnerFactory(); + + public static Executor getDefaultPool() { + return defaultPool; + } + + public static TaskRunnerFactory getDefaultTaskRunnerFactory() { + return defaultTaskRunnerFactory; + } + +} Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/DefaultThreadPools.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/DefaultThreadPools.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/DefaultThreadPools.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/PooledTaskRunner.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/PooledTaskRunner.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/PooledTaskRunner.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/PooledTaskRunner.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,135 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.openwire.thread; + +import edu.emory.mathcs.backport.java.util.concurrent.Executor; + +/** + * + * @version $Revision$ + */ +class PooledTaskRunner implements TaskRunner { + + private final int maxIterationsPerRun; + private final Executor executor; + private final Task task; + private final Runnable runable; + private boolean queued; + private boolean shutdown; + private boolean iterating; + private Thread runningThread; + + public PooledTaskRunner(Executor executor, Task task, int maxIterationsPerRun) { + this.executor = executor; + this.maxIterationsPerRun = maxIterationsPerRun; + this.task = task; + runable = new Runnable() { + public void run() { + runningThread = Thread.currentThread(); + runTask(); + runningThread = null; + } + }; + } + + + + /** + * We Expect MANY wakeup calls on the same TaskRunner. + */ + public void wakeup() throws InterruptedException { + synchronized( runable ) { + + // When we get in here, we make some assumptions of state: + // queued=false, iterating=false: wakeup() has not be called and therefore task is not executing. + // queued=true, iterating=false: wakeup() was called but, task execution has not started yet + // queued=false, iterating=true : wakeup() was called, which caused task execution to start. + // queued=true, iterating=true : wakeup() called after task execution was started. + + if( queued || shutdown ) + return; + + queued=true; + + // The runTask() method will do this for me once we are done iterating. + if( !iterating ) { + executor.execute(runable); + } + } + } + + /** + * shut down the task + * @throws InterruptedException + */ + public void shutdown() throws InterruptedException{ + synchronized(runable){ + shutdown=true; + //the check on the thread is done + //because a call to iterate can result in + //shutDown() being called, which would wait forever + //waiting for iterating to finish + if(runningThread!=Thread.currentThread()){ + while(iterating==true){ + runable.wait(); + } + } + } + } + + private void runTask() { + + synchronized (runable) { + queued = false; + if( shutdown ) { + iterating = false; + runable.notifyAll(); + return; + } + iterating = true; + } + + // Don't synchronize while we are iterating so that + // multiple wakeup() calls can be executed concurrently. + boolean done=false; + for (int i = 0; i < maxIterationsPerRun; i++) { + if( !task.iterate() ) { + done=true; + break; + } + } + + synchronized (runable) { + iterating=false; + if( shutdown ) { + queued=false; + runable.notifyAll(); + return; + } + + // If we could not iterate all the items + // then we need to re-queue. + if( !done ) + queued = true; + + if( queued ) { + executor.execute(runable); + } + } + } +} \ No newline at end of file Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/PooledTaskRunner.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/PooledTaskRunner.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/PooledTaskRunner.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/Task.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/Task.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/Task.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/Task.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,27 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.openwire.thread; + +/** + * Represents a task that may take a few iterations to complete. + * + * @version $Revision$ + */ +public interface Task { + public boolean iterate(); +} Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/Task.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/Task.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/Task.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/TaskRunner.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/TaskRunner.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/TaskRunner.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/TaskRunner.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,28 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.openwire.thread; + +/** + * Allows you to request a thread execute the associated Task. + * + * @version $Revision$ + */ +public interface TaskRunner { + public abstract void wakeup() throws InterruptedException; + public abstract void shutdown() throws InterruptedException; +} \ No newline at end of file Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/TaskRunner.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/TaskRunner.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/TaskRunner.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/TaskRunnerFactory.java URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/TaskRunnerFactory.java?view=auto&rev=447058 ============================================================================== --- geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/TaskRunnerFactory.java (added) +++ geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/TaskRunnerFactory.java Sun Sep 17 05:35:45 2006 @@ -0,0 +1,93 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geronimo.openwire.thread; + +import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService; +import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue; +import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory; +import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor; +import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; + +/** + * Manages the thread pool for long running tasks. + * + * Long running tasks are not always active but when they are active, they may + * need a few iterations of processing for them to become idle. The manager + * ensures that each task is processes but that no one task overtakes the + * system. + * + * This is kina like cooperative multitasking. + * + * @version $Revision$ + */ +public class TaskRunnerFactory { + + private ExecutorService executor; + private int maxIterationsPerRun; + private String name; + private int priority; + private boolean daemon; + + public TaskRunnerFactory() { + this("OpenWire Task", Thread.NORM_PRIORITY, true, 1000); + } + + public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) { + + this.name = name; + this.priority = priority; + this.daemon = daemon; + this.maxIterationsPerRun = maxIterationsPerRun; + + // If your OS/JVM combination has a good thread model, you may want to avoid + // using a thread pool to run tasks and use a DedicatedTaskRunner instead. + if( "true".equals(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner")) ) { + executor = null; + } else { + executor = createDefaultExecutor(); + } + } + + public void shutdown() { + if (executor != null) { + executor.shutdownNow(); + } + } + + public TaskRunner createTaskRunner(Task task, String name) { + if( executor!=null ) { + return new PooledTaskRunner(executor, task, maxIterationsPerRun); + } else { + return new DedicatedTaskRunner(task, name, priority, daemon); + } + } + + protected ExecutorService createDefaultExecutor() { + ThreadPoolExecutor rc = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, name); + thread.setDaemon(daemon); + thread.setPriority(priority); + return thread; + } + }); + rc.allowCoreThreadTimeOut(true); + return rc; + } + +} Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/TaskRunnerFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/TaskRunnerFactory.java ------------------------------------------------------------------------------ svn:keywords = Date Revision Propchange: geronimo/sandbox/gcache/openwire/src/main/java/org/apache/geronimo/openwire/thread/TaskRunnerFactory.java ------------------------------------------------------------------------------ svn:mime-type = text/plain