Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5CD1F113F2 for ; Thu, 24 Jul 2014 14:22:59 +0000 (UTC) Received: (qmail 21358 invoked by uid 500); 24 Jul 2014 14:22:52 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 21265 invoked by uid 500); 24 Jul 2014 14:22:52 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 19313 invoked by uid 99); 24 Jul 2014 14:22:51 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Jul 2014 14:22:51 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 432699B2CAA; Thu, 24 Jul 2014 14:22:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Date: Thu, 24 Jul 2014 14:23:24 -0000 Message-Id: In-Reply-To: <1bc8382c8f4c487380c559ebaef9b02d@git.apache.org> References: <1bc8382c8f4c487380c559ebaef9b02d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [35/51] [partial] https://issues.apache.org/jira/browse/OPENWIRE-1 http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java new file mode 100644 index 0000000..b1bda9b --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerAck.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.commands; + +/** + * A ProducerAck command is sent by a broker to a producer to let it know it has + * received and processed messages that it has produced. The producer will be + * flow controlled if it does not receive ProducerAck commands back from the + * broker. + * + * @openwire:marshaller code="19" version="3" + */ +public class ProducerAck extends BaseCommand { + + public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_ACK; + + protected ProducerId producerId; + protected int size; + + public ProducerAck() { + } + + public ProducerAck(ProducerId producerId, int size) { + this.producerId = producerId; + this.size = size; + } + + public void copy(ProducerAck copy) { + super.copy(copy); + copy.producerId = producerId; + copy.size = size; + } + + @Override + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + @Override + public Response visit(CommandVisitor visitor) throws Exception { + return visitor.processProducerAck(this); + } + + /** + * The producer id that this ack message is destined for. + * + * @openwire:property version=3 + */ + public ProducerId getProducerId() { + return producerId; + } + + public void setProducerId(ProducerId producerId) { + this.producerId = producerId; + } + + /** + * The number of bytes that are being acked. + * + * @openwire:property version=3 + */ + public int getSize() { + return size; + } + + public void setSize(int size) { + this.size = size; + } +} http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerId.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerId.java new file mode 100644 index 0000000..bd9283e --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerId.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.commands; + +/** + * @openwire:marshaller code="123" + * + */ +public class ProducerId implements DataStructure { + + public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_ID; + + protected String connectionId; + protected long sessionId; + protected long value; + + protected transient int hashCode; + protected transient String key; + protected transient SessionId parentId; + + public ProducerId() { + } + + public ProducerId(SessionId sessionId, long producerId) { + this.connectionId = sessionId.getConnectionId(); + this.sessionId = sessionId.getValue(); + this.value = producerId; + } + + public ProducerId(ProducerId id) { + this.connectionId = id.getConnectionId(); + this.sessionId = id.getSessionId(); + this.value = id.getValue(); + } + + public ProducerId(String producerKey) { + // Parse off the producerId + int p = producerKey.lastIndexOf(":"); + if (p >= 0) { + value = Long.parseLong(producerKey.substring(p + 1)); + producerKey = producerKey.substring(0, p); + } + setProducerSessionKey(producerKey); + } + + public SessionId getParentId() { + if (parentId == null) { + parentId = new SessionId(this); + } + return parentId; + } + + @Override + public int hashCode() { + if (hashCode == 0) { + hashCode = connectionId.hashCode() ^ (int)sessionId ^ (int)value; + } + return hashCode; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || o.getClass() != ProducerId.class) { + return false; + } + ProducerId id = (ProducerId)o; + return sessionId == id.sessionId && value == id.value && connectionId.equals(id.connectionId); + } + + /** + * @param sessionKey + */ + private void setProducerSessionKey(String sessionKey) { + // Parse off the value + int p = sessionKey.lastIndexOf(":"); + if (p >= 0) { + sessionId = Long.parseLong(sessionKey.substring(p + 1)); + sessionKey = sessionKey.substring(0, p); + } + // The rest is the value + connectionId = sessionKey; + } + + @Override + public String toString() { + if (key == null) { + key = connectionId + ":" + sessionId + ":" + value; + } + return key; + } + + @Override + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + /** + * @openwire:property version=1 cache=true + */ + public String getConnectionId() { + return connectionId; + } + + public void setConnectionId(String connectionId) { + this.connectionId = connectionId; + } + + /** + * @openwire:property version=1 + */ + public long getValue() { + return value; + } + + public void setValue(long producerId) { + this.value = producerId; + } + + /** + * @openwire:property version=1 + */ + public long getSessionId() { + return sessionId; + } + + public void setSessionId(long sessionId) { + this.sessionId = sessionId; + } + + @Override + public boolean isMarshallAware() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerInfo.java new file mode 100644 index 0000000..23758b0 --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ProducerInfo.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.commands; + + +/** + * @openwire:marshaller code="6" + */ +public class ProducerInfo extends BaseCommand { + + public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_INFO; + + protected ProducerId producerId; + protected OpenWireDestination destination; + protected BrokerId[] brokerPath; + protected boolean dispatchAsync; + protected int windowSize; + + public ProducerInfo() { + } + + public ProducerInfo(ProducerId producerId) { + this.producerId = producerId; + } + + public ProducerInfo(SessionInfo sessionInfo, long producerId) { + this.producerId = new ProducerId(sessionInfo.getSessionId(), producerId); + } + + public ProducerInfo copy() { + ProducerInfo info = new ProducerInfo(); + copy(info); + return info; + } + + public void copy(ProducerInfo info) { + super.copy(info); + info.producerId = producerId; + info.destination = destination; + } + + @Override + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + /** + * @openwire:property version=1 cache=true + */ + public ProducerId getProducerId() { + return producerId; + } + + public void setProducerId(ProducerId producerId) { + this.producerId = producerId; + } + + /** + * @openwire:property version=1 cache=true + */ + public OpenWireDestination getDestination() { + return destination; + } + + public void setDestination(OpenWireDestination destination) { + this.destination = destination; + } + + public RemoveInfo createRemoveCommand() { + RemoveInfo command = new RemoveInfo(getProducerId()); + command.setResponseRequired(isResponseRequired()); + return command; + } + + /** + * The route of brokers the command has moved through. + * + * @openwire:property version=1 cache=true + */ + public BrokerId[] getBrokerPath() { + return brokerPath; + } + + public void setBrokerPath(BrokerId[] brokerPath) { + this.brokerPath = brokerPath; + } + + @Override + public Response visit(CommandVisitor visitor) throws Exception { + return visitor.processAddProducer(this); + } + + /** + * If the broker should dispatch messages from this producer async. Since + * sync dispatch could potentally block the producer thread, this could be + * an important setting for the producer. + * + * @openwire:property version=2 + */ + public boolean isDispatchAsync() { + return dispatchAsync; + } + + public void setDispatchAsync(boolean dispatchAsync) { + this.dispatchAsync = dispatchAsync; + } + + /** + * Used to configure the producer window size. A producer will send up to + * the configured window size worth of payload data to the broker before + * waiting for an Ack that allows him to send more. + * + * @openwire:property version=3 + */ + public int getWindowSize() { + return windowSize; + } + + public void setWindowSize(int windowSize) { + this.windowSize = windowSize; + } + + @Override + public boolean isProducerInfo() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveInfo.java new file mode 100644 index 0000000..9aa4b5e --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveInfo.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.commands; + +import java.io.IOException; + +/** + * Removes a consumer, producer, session or connection. + * + * @openwire:marshaller code="12" + */ +public class RemoveInfo extends BaseCommand { + + public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_INFO; + + protected DataStructure objectId; + protected long lastDeliveredSequenceId; + + public RemoveInfo() { + } + + public RemoveInfo(DataStructure objectId) { + this.objectId = objectId; + } + + @Override + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + /** + * @openwire:property version=1 cache=true + */ + public DataStructure getObjectId() { + return objectId; + } + + public void setObjectId(DataStructure objectId) { + this.objectId = objectId; + } + + /** + * @openwire:property version=5 cache=false + */ + public long getLastDeliveredSequenceId() { + return lastDeliveredSequenceId; + } + + public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) { + this.lastDeliveredSequenceId = lastDeliveredSequenceId; + } + + @Override + public Response visit(CommandVisitor visitor) throws Exception { + switch (objectId.getDataStructureType()) { + case ConnectionId.DATA_STRUCTURE_TYPE: + return visitor.processRemoveConnection((ConnectionId)objectId, lastDeliveredSequenceId); + case SessionId.DATA_STRUCTURE_TYPE: + return visitor.processRemoveSession((SessionId)objectId, lastDeliveredSequenceId); + case ConsumerId.DATA_STRUCTURE_TYPE: + return visitor.processRemoveConsumer((ConsumerId)objectId, lastDeliveredSequenceId); + case ProducerId.DATA_STRUCTURE_TYPE: + return visitor.processRemoveProducer((ProducerId)objectId); + default: + throw new IOException("Unknown remove command type: " + objectId.getDataStructureType()); + } + } + + /** + * Returns true if this event is for a removed connection + */ + public boolean isConnectionRemove() { + return objectId.getDataStructureType() == ConnectionId.DATA_STRUCTURE_TYPE; + } + + /** + * Returns true if this event is for a removed session + */ + public boolean isSessionRemove() { + return objectId.getDataStructureType() == SessionId.DATA_STRUCTURE_TYPE; + } + + /** + * Returns true if this event is for a removed consumer + */ + public boolean isConsumerRemove() { + return objectId.getDataStructureType() == ConsumerId.DATA_STRUCTURE_TYPE; + } + + /** + * Returns true if this event is for a removed producer + */ + public boolean isProducerRemove() { + return objectId.getDataStructureType() == ProducerId.DATA_STRUCTURE_TYPE; + } +} http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveSubscriptionInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveSubscriptionInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveSubscriptionInfo.java new file mode 100644 index 0000000..5e04db8 --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/RemoveSubscriptionInfo.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.commands; + +/** + * @openwire:marshaller code="9" + */ +public class RemoveSubscriptionInfo extends BaseCommand { + + public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_SUBSCRIPTION_INFO; + + protected ConnectionId connectionId; + protected String clientId; + protected String subscriptionName; + + @Override + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + /** + * @openwire:property version=1 cache=true + */ + public ConnectionId getConnectionId() { + return connectionId; + } + + public void setConnectionId(ConnectionId connectionId) { + this.connectionId = connectionId; + } + + /** + * @openwire:property version=1 + */ + public String getSubcriptionName() { + return subscriptionName; + } + + /** + */ + public void setSubcriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + } + + public String getSubscriptionName() { + return subscriptionName; + } + + public void setSubscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + } + + /** + * @openwire:property version=1 + */ + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + @Override + public Response visit(CommandVisitor visitor) throws Exception { + return visitor.processRemoveSubscription(this); + } +} http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ReplayCommand.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ReplayCommand.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ReplayCommand.java new file mode 100644 index 0000000..9d1b4c8 --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ReplayCommand.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.commands; + +/** + * A general purpose replay command for some kind of producer where ranges of + * messages are asked to be replayed. This command is typically used over a + * non-reliable transport such as UDP or multicast but could also be used on + * TCP/IP if a socket has been re-established. + * + * @openwire:marshaller code="65" + */ +public class ReplayCommand extends BaseCommand { + + public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REPLAY; + + private String producerId; + private int firstAckNumber; + private int lastAckNumber; + private int firstNakNumber; + private int lastNakNumber; + + public ReplayCommand() { + } + + @Override + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + public String getProducerId() { + return producerId; + } + + /** + * Is used to uniquely identify the producer of the sequence + * + * @openwire:property version=1 cache=false + */ + public void setProducerId(String producerId) { + this.producerId = producerId; + } + + public int getFirstAckNumber() { + return firstAckNumber; + } + + /** + * Is used to specify the first sequence number being acknowledged as delivered on the transport + * so that it can be removed from cache + * + * @openwire:property version=1 + */ + public void setFirstAckNumber(int firstSequenceNumber) { + this.firstAckNumber = firstSequenceNumber; + } + + public int getLastAckNumber() { + return lastAckNumber; + } + + /** + * Is used to specify the last sequence number being acknowledged as delivered on the transport + * so that it can be removed from cache + * + * @openwire:property version=1 + */ + public void setLastAckNumber(int lastSequenceNumber) { + this.lastAckNumber = lastSequenceNumber; + } + + @Override + public Response visit(CommandVisitor visitor) throws Exception { + return null; + } + + /** + * Is used to specify the first sequence number to be replayed + * + * @openwire:property version=1 + */ + public int getFirstNakNumber() { + return firstNakNumber; + } + + public void setFirstNakNumber(int firstNakNumber) { + this.firstNakNumber = firstNakNumber; + } + + /** + * Is used to specify the last sequence number to be replayed + * + * @openwire:property version=1 + */ + public int getLastNakNumber() { + return lastNakNumber; + } + + public void setLastNakNumber(int lastNakNumber) { + this.lastNakNumber = lastNakNumber; + } + + @Override + public String toString() { + return "ReplayCommand {commandId = " + getCommandId() + ", firstNakNumber = " + getFirstNakNumber() + ", lastNakNumber = " + getLastNakNumber() + "}"; + } +} http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Response.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Response.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Response.java new file mode 100644 index 0000000..15e8508 --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/Response.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.commands; + +/** + * @openwire:marshaller code="30" + */ +public class Response extends BaseCommand { + + public static final byte DATA_STRUCTURE_TYPE = CommandTypes.RESPONSE; + int correlationId; + + @Override + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + /** + * @openwire:property version=1 + */ + public int getCorrelationId() { + return correlationId; + } + + public void setCorrelationId(int responseId) { + this.correlationId = responseId; + } + + @Override + public boolean isResponse() { + return true; + } + + public boolean isException() { + return false; + } + + @Override + public Response visit(CommandVisitor visitor) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionId.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionId.java new file mode 100644 index 0000000..d30b6b2 --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionId.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.commands; + +/** + * @openwire:marshaller code="121" + */ +public class SessionId implements DataStructure { + + public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_ID; + + protected String connectionId; + protected long value; + + protected transient int hashCode; + protected transient String key; + protected transient ConnectionId parentId; + + public SessionId() { + } + + public SessionId(ConnectionId connectionId, long sessionId) { + this.connectionId = connectionId.getValue(); + this.value = sessionId; + } + + public SessionId(SessionId id) { + this.connectionId = id.getConnectionId(); + this.value = id.getValue(); + } + + public SessionId(ProducerId id) { + this.connectionId = id.getConnectionId(); + this.value = id.getSessionId(); + } + + public SessionId(ConsumerId id) { + this.connectionId = id.getConnectionId(); + this.value = id.getSessionId(); + } + + public ConnectionId getParentId() { + if (parentId == null) { + parentId = new ConnectionId(this); + } + return parentId; + } + + @Override + public int hashCode() { + if (hashCode == 0) { + hashCode = connectionId.hashCode() ^ (int)value; + } + return hashCode; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || o.getClass() != SessionId.class) { + return false; + } + SessionId id = (SessionId)o; + return value == id.value && connectionId.equals(id.connectionId); + } + + @Override + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + /** + * @openwire:property version=1 cache=true + */ + public String getConnectionId() { + return connectionId; + } + + public void setConnectionId(String connectionId) { + this.connectionId = connectionId; + } + + /** + * @openwire:property version=1 + */ + public long getValue() { + return value; + } + + public void setValue(long sessionId) { + this.value = sessionId; + } + + @Override + public String toString() { + if (key == null) { + key = connectionId + ":" + value; + } + return key; + } + + @Override + public boolean isMarshallAware() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionInfo.java new file mode 100644 index 0000000..4064c3f --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SessionInfo.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.commands; + +/** + * @openwire:marshaller code="4" + */ +public class SessionInfo extends BaseCommand { + + public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_INFO; + + protected SessionId sessionId; + + public SessionInfo() { + sessionId = new SessionId(); + } + + public SessionInfo(ConnectionInfo connectionInfo, long sessionId) { + this.sessionId = new SessionId(connectionInfo.getConnectionId(), sessionId); + } + + public SessionInfo(SessionId sessionId) { + this.sessionId = sessionId; + } + + @Override + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + /** + * @openwire:property version=1 cache=true + */ + public SessionId getSessionId() { + return sessionId; + } + + public void setSessionId(SessionId sessionId) { + this.sessionId = sessionId; + } + + public RemoveInfo createRemoveCommand() { + RemoveInfo command = new RemoveInfo(getSessionId()); + command.setResponseRequired(isResponseRequired()); + return command; + } + + @Override + public Response visit(CommandVisitor visitor) throws Exception { + return visitor.processAddSession(this); + } + + @Override + public boolean isSessionInfo() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ShutdownInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ShutdownInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ShutdownInfo.java new file mode 100644 index 0000000..72f67e0 --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/ShutdownInfo.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.commands; + +/** + * @openwire:marshaller code="11" + */ +public class ShutdownInfo extends BaseCommand { + + public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SHUTDOWN_INFO; + + @Override + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + @Override + public Response visit(CommandVisitor visitor) throws Exception { + return visitor.processShutdown(this); + } + + @Override + public boolean isShutdownInfo() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SubscriptionInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SubscriptionInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SubscriptionInfo.java new file mode 100644 index 0000000..41ffcf6 --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/SubscriptionInfo.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.commands; + +/** + * Used to represent a durable subscription. + * + * @openwire:marshaller code="55" + */ +public class SubscriptionInfo implements DataStructure { + + public static final byte DATA_STRUCTURE_TYPE = CommandTypes.DURABLE_SUBSCRIPTION_INFO; + + protected OpenWireDestination subscribedDestination; + protected OpenWireDestination destination; + protected String clientId; + protected String subscriptionName; + protected String selector; + + public SubscriptionInfo() { + } + + public SubscriptionInfo(String clientId, String subscriptionName) { + this.clientId = clientId; + this.subscriptionName = subscriptionName; + } + + @Override + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + /** + * @openwire:property version=1 + */ + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + /** + * This is the a resolved destination that the subscription is receiving messages from. This + * will never be a pattern or a composite destination. + * + * @openwire:property version=1 cache=true + */ + public OpenWireDestination getDestination() { + return destination; + } + + public void setDestination(OpenWireDestination destination) { + this.destination = destination; + } + + /** + * @openwire:property version=1 + */ + public String getSelector() { + return selector; + } + + public void setSelector(String selector) { + this.selector = selector; + } + + /** + * @openwire:property version=1 + */ + public String getSubcriptionName() { + return subscriptionName; + } + + /** + * @param subscriptionName + * * + */ + public void setSubcriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + } + + public String getSubscriptionName() { + return subscriptionName; + } + + public void setSubscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + } + + @Override + public boolean isMarshallAware() { + return false; + } + + @Override + public String toString() { + return getClass().getSimpleName() + " { " + destination + " }"; + } + + @Override + public int hashCode() { + int h1 = clientId != null ? clientId.hashCode() : -1; + int h2 = subscriptionName != null ? subscriptionName.hashCode() : -1; + return h1 ^ h2; + } + + @Override + public boolean equals(Object obj) { + boolean result = false; + if (obj instanceof SubscriptionInfo) { + SubscriptionInfo other = (SubscriptionInfo) obj; + result = (clientId == null && other.clientId == null || + clientId != null && other.clientId != null && clientId.equals(other.clientId)) && + (subscriptionName == null && other.subscriptionName == null || + subscriptionName != null && other.subscriptionName != null && subscriptionName.equals(other.subscriptionName)); + } + return result; + } + + /** + * The destination the client originally subscribed to.. This may not match the {@see + * getDestination} method if the subscribed destination uses patterns or composites. + * + * If the subscribed destinations not set, this just returns the destination. + * + * @openwire:property version=3 + */ + public OpenWireDestination getSubscribedDestination() { + if (subscribedDestination == null) { + return getDestination(); + } + return subscribedDestination; + } + + public void setSubscribedDestination(OpenWireDestination subscribedDestination) { + this.subscribedDestination = subscribedDestination; + } +} http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionId.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionId.java new file mode 100644 index 0000000..71be56c --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionId.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.commands; + +/** + * @openwire:marshaller + */ +public abstract class TransactionId implements DataStructure { + + public abstract boolean isXATransaction(); + public abstract boolean isLocalTransaction(); + public abstract String getTransactionKey(); + + @Override + public boolean isMarshallAware() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionInfo.java new file mode 100644 index 0000000..698d5e2 --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/TransactionInfo.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.commands; + +import java.io.IOException; + +/** + * @openwire:marshaller code="7" + */ +public class TransactionInfo extends BaseCommand { + + public static final byte DATA_STRUCTURE_TYPE = CommandTypes.TRANSACTION_INFO; + + public static final byte BEGIN = 0; + public static final byte PREPARE = 1; + public static final byte COMMIT_ONE_PHASE = 2; + public static final byte COMMIT_TWO_PHASE = 3; + public static final byte ROLLBACK = 4; + public static final byte RECOVER = 5; + public static final byte FORGET = 6; + public static final byte END = 7; + + protected byte type; + protected ConnectionId connectionId; + protected TransactionId transactionId; + + public TransactionInfo() { + } + + public TransactionInfo(ConnectionId connectionId, TransactionId transactionId, byte type) { + this.connectionId = connectionId; + this.transactionId = transactionId; + this.type = type; + } + + @Override + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + /** + * @openwire:property version=1 cache=true + */ + public ConnectionId getConnectionId() { + return connectionId; + } + + public void setConnectionId(ConnectionId connectionId) { + this.connectionId = connectionId; + } + + /** + * @openwire:property version=1 cache=true + */ + public TransactionId getTransactionId() { + return transactionId; + } + + public void setTransactionId(TransactionId transactionId) { + this.transactionId = transactionId; + } + + /** + * @openwire:property version=1 + */ + public byte getType() { + return type; + } + + public void setType(byte type) { + this.type = type; + } + + @Override + public Response visit(CommandVisitor visitor) throws Exception { + switch (type) { + case TransactionInfo.BEGIN: + return visitor.processBeginTransaction(this); + case TransactionInfo.END: + return visitor.processEndTransaction(this); + case TransactionInfo.PREPARE: + return visitor.processPrepareTransaction(this); + case TransactionInfo.COMMIT_ONE_PHASE: + return visitor.processCommitTransactionOnePhase(this); + case TransactionInfo.COMMIT_TWO_PHASE: + return visitor.processCommitTransactionTwoPhase(this); + case TransactionInfo.ROLLBACK: + return visitor.processRollbackTransaction(this); + case TransactionInfo.RECOVER: + return visitor.processRecoverTransactions(this); + case TransactionInfo.FORGET: + return visitor.processForgetTransaction(this); + default: + throw new IOException("Transaction info type unknown: " + type); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java new file mode 100644 index 0000000..f6ceca0 --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/WireFormatInfo.java @@ -0,0 +1,395 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.commands; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq.openwire.codec.OpenWireFormat; +import org.apache.activemq.openwire.utils.OpenWireMarshallingSupport; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.ByteArrayInputStream; +import org.fusesource.hawtbuf.ByteArrayOutputStream; +import org.fusesource.hawtbuf.UTF8Buffer; + +/** + * @openwire:marshaller code="1" + */ +public class WireFormatInfo implements Command, MarshallAware { + + public static final byte DATA_STRUCTURE_TYPE = CommandTypes.WIREFORMAT_INFO; + private static final int MAX_PROPERTY_SIZE = 1024 * 4; + private static final byte MAGIC[] = new byte[] { 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q' }; + + protected byte magic[] = MAGIC; + protected int version; + protected Buffer marshalledProperties; + + protected transient Map properties; + + @Override + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + @Override + public boolean isWireFormatInfo() { + return true; + } + + @Override + public boolean isMarshallAware() { + return true; + } + + /** + * @openwire:property version=1 size=8 testSize=-1 + */ + public byte[] getMagic() { + return magic; + } + + public void setMagic(byte[] magic) { + this.magic = magic; + } + + /** + * @openwire:property version=1 + */ + public int getVersion() { + return version; + } + + public void setVersion(int version) { + this.version = version; + } + + /** + * @openwire:property version=1 + */ + public Buffer getMarshalledProperties() { + return marshalledProperties; + } + + public void setMarshalledProperties(Buffer marshalledProperties) { + this.marshalledProperties = marshalledProperties; + } + + // //////////////////// + // Implementation Methods. + // //////////////////// + + public Object getProperty(String name) throws IOException { + if (properties == null) { + if (marshalledProperties == null) { + return null; + } + properties = unmarsallProperties(marshalledProperties); + } + return properties.get(name); + } + + @SuppressWarnings("unchecked") + public Map getProperties() throws IOException { + if (properties == null) { + if (marshalledProperties == null) { + return Collections.EMPTY_MAP; + } + properties = unmarsallProperties(marshalledProperties); + } + return Collections.unmodifiableMap(properties); + } + + public void clearProperties() { + marshalledProperties = null; + properties = null; + } + + public void setProperty(String name, Object value) throws IOException { + lazyCreateProperties(); + properties.put(name, value); + } + + protected void lazyCreateProperties() throws IOException { + if (properties == null) { + if (marshalledProperties == null) { + properties = new HashMap(); + } else { + properties = unmarsallProperties(marshalledProperties); + marshalledProperties = null; + } + } + } + + private Map unmarsallProperties(Buffer marshalledProperties) throws IOException { + return OpenWireMarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)), MAX_PROPERTY_SIZE); + } + + @Override + public void beforeMarshall(OpenWireFormat wireFormat) throws IOException { + // Need to marshal the properties. + if (marshalledProperties == null && properties != null) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + OpenWireMarshallingSupport.marshalPrimitiveMap(properties, os); + os.close(); + marshalledProperties = baos.toBuffer(); + } + } + + @Override + public void afterMarshall(OpenWireFormat wireFormat) throws IOException { + } + + @Override + public void beforeUnmarshall(OpenWireFormat wireFormat) throws IOException { + } + + @Override + public void afterUnmarshall(OpenWireFormat wireFormat) throws IOException { + } + + public boolean isValid() { + return magic != null && Arrays.equals(magic, MAGIC); + } + + @Override + public void setResponseRequired(boolean responseRequired) { + } + + /** + * @throws IOException + */ + public boolean isCacheEnabled() throws IOException { + return Boolean.TRUE == getProperty("CacheEnabled"); + } + + public void setCacheEnabled(boolean cacheEnabled) throws IOException { + setProperty("CacheEnabled", cacheEnabled ? Boolean.TRUE : Boolean.FALSE); + } + + /** + * @throws IOException + */ + public boolean isStackTraceEnabled() throws IOException { + return Boolean.TRUE == getProperty("StackTraceEnabled"); + } + + public void setStackTraceEnabled(boolean stackTraceEnabled) throws IOException { + setProperty("StackTraceEnabled", stackTraceEnabled ? Boolean.TRUE : Boolean.FALSE); + } + + /** + * @throws IOException + */ + public boolean isTcpNoDelayEnabled() throws IOException { + return Boolean.TRUE == getProperty("TcpNoDelayEnabled"); + } + + public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) throws IOException { + setProperty("TcpNoDelayEnabled", tcpNoDelayEnabled ? Boolean.TRUE : Boolean.FALSE); + } + + /** + * @throws IOException + */ + public boolean isSizePrefixDisabled() throws IOException { + return Boolean.TRUE == getProperty("SizePrefixDisabled"); + } + + public void setSizePrefixDisabled(boolean prefixPacketSize) throws IOException { + setProperty("SizePrefixDisabled", prefixPacketSize ? Boolean.TRUE : Boolean.FALSE); + } + + /** + * @throws IOException + */ + public boolean isTightEncodingEnabled() throws IOException { + return Boolean.TRUE == getProperty("TightEncodingEnabled"); + } + + public void setTightEncodingEnabled(boolean tightEncodingEnabled) throws IOException { + setProperty("TightEncodingEnabled", tightEncodingEnabled ? Boolean.TRUE : Boolean.FALSE); + } + + public String getHost() throws IOException { + UTF8Buffer buff = (UTF8Buffer) getProperty("Host"); + if (buff == null) { + return null; + } + return buff.toString(); + } + + public void setHost(String hostname) throws IOException { + setProperty("Host", hostname); + } + + /** + * @throws IOException + */ + public long getMaxInactivityDuration() throws IOException { + Long l = (Long) getProperty("MaxInactivityDuration"); + return l == null ? 0 : l.longValue(); + } + + public void setMaxInactivityDuration(long maxInactivityDuration) throws IOException { + setProperty("MaxInactivityDuration", new Long(maxInactivityDuration)); + } + + public long getMaxInactivityDurationInitalDelay() throws IOException { + Long l = (Long) getProperty("MaxInactivityDurationInitalDelay"); + return l == null ? 0 : l.longValue(); + } + + public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) throws IOException { + setProperty("MaxInactivityDurationInitalDelay", new Long(maxInactivityDurationInitalDelay)); + } + + public long getMaxFrameSize() throws IOException { + Long l = (Long) getProperty("MaxFrameSize"); + return l == null ? 0 : l.longValue(); + } + + public void setMaxFrameSize(long maxFrameSize) throws IOException { + setProperty("MaxFrameSize", new Long(maxFrameSize)); + } + + /** + * @throws IOException + */ + public int getCacheSize() throws IOException { + Integer i = (Integer) getProperty("CacheSize"); + return i == null ? 0 : i.intValue(); + } + + public void setCacheSize(int cacheSize) throws IOException { + setProperty("CacheSize", new Integer(cacheSize)); + } + + @Override + public Response visit(CommandVisitor visitor) throws Exception { + return visitor.processWireFormat(this); + } + + @Override + public String toString() { + Map p = null; + try { + p = getProperties(); + } catch (IOException ignore) { + } + return "WireFormatInfo { version=" + version + ", properties=" + p + ", magic=" + toString(magic) + "}"; + } + + private String toString(byte[] data) { + StringBuffer sb = new StringBuffer(); + sb.append('['); + for (int i = 0; i < data.length; i++) { + if (i != 0) { + sb.append(','); + } + sb.append((char) data[i]); + } + sb.append(']'); + return sb.toString(); + } + + // ///////////////////////////////////////////////////////////// + // + // This are not implemented. + // + // ///////////////////////////////////////////////////////////// + + @Override + public void setCommandId(int value) { + } + + @Override + public int getCommandId() { + return 0; + } + + @Override + public boolean isResponseRequired() { + return false; + } + + @Override + public boolean isResponse() { + return false; + } + + @Override + public boolean isBrokerInfo() { + return false; + } + + @Override + public boolean isMessageDispatch() { + return false; + } + + @Override + public boolean isMessage() { + return false; + } + + @Override + public boolean isMessageAck() { + return false; + } + + @Override + public boolean isMessageDispatchNotification() { + return false; + } + + @Override + public boolean isShutdownInfo() { + return false; + } + + @Override + public boolean isConnectionControl() { + return false; + } + + @Override + public boolean isConnectionInfo() { + return false; + } + + @Override + public boolean isSessionInfo() { + return false; + } + + @Override + public boolean isProducerInfo() { + return false; + } + + @Override + public boolean isConsumerInfo() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java new file mode 100644 index 0000000..8781386 --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/XATransactionId.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.commands; + +import java.io.IOException; +import java.util.Arrays; + +import javax.transaction.xa.Xid; + +import org.fusesource.hawtbuf.DataByteArrayInputStream; +import org.fusesource.hawtbuf.DataByteArrayOutputStream; + +/** + * @openwire:marshaller code="112" + */ +public class XATransactionId extends TransactionId implements Xid, Comparable { + + public static final byte DATA_STRUCTURE_TYPE = CommandTypes.OPENWIRE_XA_TRANSACTION_ID; + + private int formatId; + private byte[] branchQualifier; + private byte[] globalTransactionId; + private transient DataByteArrayOutputStream outputStream; + private transient byte[] encodedXidBytes; + + private transient int hash; + private transient String transactionKey; + + public XATransactionId() { + } + + public XATransactionId(Xid xid) { + this.formatId = xid.getFormatId(); + this.globalTransactionId = xid.getGlobalTransactionId(); + this.branchQualifier = xid.getBranchQualifier(); + } + + public XATransactionId(byte[] encodedBytes) throws IOException { + encodedXidBytes = encodedBytes; + initFromEncodedBytes(); + } + + @Override + public byte getDataStructureType() { + return DATA_STRUCTURE_TYPE; + } + + final int XID_PREFIX_SIZE = 16; + //+|-,(long)lastAck,(byte)priority,(int)formatid,(short)globalLength.... + private void initFromEncodedBytes() throws IOException { + DataByteArrayInputStream inputStream = new DataByteArrayInputStream(encodedXidBytes); + inputStream.skipBytes(10); + formatId = inputStream.readInt(); + int globalLength = inputStream.readShort(); + globalTransactionId = new byte[globalLength]; + try { + inputStream.read(globalTransactionId); + branchQualifier = new byte[inputStream.available()]; + inputStream.read(branchQualifier); + } catch (IOException fatal) { + throw new RuntimeException(this + ", failed to decode:", fatal); + } finally { + inputStream.close(); + } + } + + public synchronized byte[] getEncodedXidBytes() { + if (encodedXidBytes == null) { + outputStream = new DataByteArrayOutputStream(XID_PREFIX_SIZE + globalTransactionId.length + branchQualifier.length); + try { + outputStream.position(10); + outputStream.writeInt(formatId); + outputStream.writeShort(globalTransactionId.length); + } catch (IOException fatal) { + throw new RuntimeException(this + ", failed to encode:", fatal); + } + try { + outputStream.write(globalTransactionId); + outputStream.write(branchQualifier); + } catch (IOException fatal) { + throw new RuntimeException(this + ", failed to encode:", fatal); + } + encodedXidBytes = outputStream.getData(); + } + return encodedXidBytes; + } + + public DataByteArrayOutputStream internalOutputStream() { + return outputStream; + } + + @Override + public synchronized String getTransactionKey() { + if (transactionKey == null) { + StringBuffer s = new StringBuffer(); + s.append("XID:[" + formatId + ",globalId="); + s.append(stringForm(formatId, globalTransactionId)); + s.append(",branchId="); + s.append(stringForm(formatId, branchQualifier)); + s.append("]"); + transactionKey = s.toString(); + } + return transactionKey; + } + + private String stringForm(int format, byte[] uid) { + StringBuffer s = new StringBuffer(); + switch (format) { + case 131077: // arjuna + stringFormArj(s, uid); + break; + default: // aries + stringFormDefault(s, uid); + } + return s.toString(); + } + + private void stringFormDefault(StringBuffer s, byte[] uid) { + for (int i = 0; i < uid.length; i++) { + s.append(Integer.toHexString(uid[i])); + } + } + + private void stringFormArj(StringBuffer s, byte[] uid) { + DataByteArrayInputStream byteArrayInputStream = null; + try { + byteArrayInputStream = new DataByteArrayInputStream(uid); + s.append(Long.toString(byteArrayInputStream.readLong(), 16)); + s.append(':'); + s.append(Long.toString(byteArrayInputStream.readLong(), 16)); + s.append(':'); + + s.append(Integer.toString(byteArrayInputStream.readInt(), 16)); + s.append(':'); + s.append(Integer.toString(byteArrayInputStream.readInt(), 16)); + s.append(':'); + s.append(Integer.toString(byteArrayInputStream.readInt(), 16)); + } catch (Exception ignored) { + stringFormDefault(s, uid); + } finally { + try { + byteArrayInputStream.close(); + } catch (IOException e) { + } + } + } + + @Override + public String toString() { + return getTransactionKey(); + } + + @Override + public boolean isXATransaction() { + return true; + } + + @Override + public boolean isLocalTransaction() { + return false; + } + + /** + * @openwire:property version=1 + */ + @Override + public int getFormatId() { + return formatId; + } + + /** + * @openwire:property version=1 + */ + @Override + public byte[] getGlobalTransactionId() { + return globalTransactionId; + } + + /** + * @openwire:property version=1 + */ + @Override + public byte[] getBranchQualifier() { + return branchQualifier; + } + + public void setBranchQualifier(byte[] branchQualifier) { + this.branchQualifier = branchQualifier; + this.hash = 0; + } + + public void setFormatId(int formatId) { + this.formatId = formatId; + this.hash = 0; + } + + public void setGlobalTransactionId(byte[] globalTransactionId) { + this.globalTransactionId = globalTransactionId; + this.hash = 0; + } + + @Override + public int hashCode() { + if (hash == 0) { + hash = formatId; + hash = hash(globalTransactionId, hash); + hash = hash(branchQualifier, hash); + if (hash == 0) { + hash = 0xaceace; + } + } + return hash; + } + + private static int hash(byte[] bytes, int hash) { + int size = bytes.length; + for (int i = 0; i < size; i++) { + hash ^= bytes[i] << ((i % 4) * 8); + } + return hash; + } + + @Override + public boolean equals(Object o) { + if (o == null || o.getClass() != XATransactionId.class) { + return false; + } + + XATransactionId xid = (XATransactionId)o; + return xid.formatId == formatId && + Arrays.equals(xid.globalTransactionId, globalTransactionId) && + Arrays.equals(xid.branchQualifier, branchQualifier); + } + + @Override + public int compareTo(XATransactionId xid) { + if (xid == null) { + return -1; + } + + return getTransactionKey().compareTo(xid.getTransactionKey()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/commands/package.html ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/commands/package.html b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/package.html new file mode 100644 index 0000000..236b95c --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/commands/package.html @@ -0,0 +1,24 @@ + + + + +

+ Represents the Object Model for the OpenWire protocol. +

+ + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/utils/CronParser.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/CronParser.java b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/CronParser.java new file mode 100644 index 0000000..491f26d --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/CronParser.java @@ -0,0 +1,357 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.utils; + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.List; +import java.util.StringTokenizer; + +import javax.jms.MessageFormatException; + +public class CronParser { + + private static final int NUMBER_TOKENS = 5; + private static final int MINUTES = 0; + private static final int HOURS = 1; + private static final int DAY_OF_MONTH = 2; + private static final int MONTH = 3; + private static final int DAY_OF_WEEK = 4; + + public static long getNextScheduledTime(final String cronEntry, long currentTime) throws MessageFormatException { + + long result = 0; + + if (cronEntry == null || cronEntry.length() == 0) { + return result; + } + + // Handle the once per minute case "* * * * *" + // starting the next event at the top of the minute. + if (cronEntry.equals("* * * * *")) { + result = currentTime + 60 * 1000; + result = result / 60000 * 60000; + return result; + } + + List list = tokenize(cronEntry); + List entries = buildCronEntries(list); + Calendar working = Calendar.getInstance(); + working.setTimeInMillis(currentTime); + working.set(Calendar.SECOND, 0); + + CronEntry minutes = entries.get(MINUTES); + CronEntry hours = entries.get(HOURS); + CronEntry dayOfMonth = entries.get(DAY_OF_MONTH); + CronEntry month = entries.get(MONTH); + CronEntry dayOfWeek = entries.get(DAY_OF_WEEK); + + // Start at the top of the next minute, cron is only guaranteed to be + // run on the minute. + int timeToNextMinute = 60 - working.get(Calendar.SECOND); + working.add(Calendar.SECOND, timeToNextMinute); + + // If its already to late in the day this will roll us over to tomorrow + // so we'll need to check again when done updating month and day. + int currentMinutes = working.get(Calendar.MINUTE); + if (!isCurrent(minutes, currentMinutes)) { + int nextMinutes = getNext(minutes, currentMinutes); + working.add(Calendar.MINUTE, nextMinutes); + } + + int currentHours = working.get(Calendar.HOUR_OF_DAY); + if (!isCurrent(hours, currentHours)) { + int nextHour = getNext(hours, currentHours); + working.add(Calendar.HOUR_OF_DAY, nextHour); + } + + // We can roll into the next month here which might violate the cron setting + // rules so we check once then recheck again after applying the month settings. + doUpdateCurrentDay(working, dayOfMonth, dayOfWeek); + + // Start by checking if we are in the right month, if not then calculations + // need to start from the beginning of the month to ensure that we don't end + // up on the wrong day. (Can happen when DAY_OF_WEEK is set and current time + // is ahead of the day of the week to execute on). + doUpdateCurrentMonth(working, month); + + // Now Check day of week and day of month together since they can be specified + // together in one entry, if both "day of month" and "day of week" are restricted + // (not "*"), then either the "day of month" field (3) or the "day of week" field + // (5) must match the current day or the Calenday must be advanced. + doUpdateCurrentDay(working, dayOfMonth, dayOfWeek); + + // Now we can chose the correct hour and minute of the day in question. + + currentHours = working.get(Calendar.HOUR_OF_DAY); + if (!isCurrent(hours, currentHours)) { + int nextHour = getNext(hours, currentHours); + working.add(Calendar.HOUR_OF_DAY, nextHour); + } + + currentMinutes = working.get(Calendar.MINUTE); + if (!isCurrent(minutes, currentMinutes)) { + int nextMinutes = getNext(minutes, currentMinutes); + working.add(Calendar.MINUTE, nextMinutes); + } + + result = working.getTimeInMillis(); + + if (result <= currentTime) { + throw new ArithmeticException("Unable to compute next scheduled exection time."); + } + + return result; + } + + protected static long doUpdateCurrentMonth(Calendar working, CronEntry month) throws MessageFormatException { + int currentMonth = working.get(Calendar.MONTH) + 1; + if (!isCurrent(month, currentMonth)) { + int nextMonth = getNext(month, currentMonth); + working.add(Calendar.MONTH, nextMonth); + + // Reset to start of month. + resetToStartOfDay(working, 1); + + return working.getTimeInMillis(); + } + + return 0L; + } + + protected static long doUpdateCurrentDay(Calendar working, CronEntry dayOfMonth, CronEntry dayOfWeek) throws MessageFormatException { + + int currentDayOfWeek = working.get(Calendar.DAY_OF_WEEK) - 1; + int currentDayOfMonth = working.get(Calendar.DAY_OF_MONTH); + + // Simplest case, both are unrestricted or both match today otherwise + // result must be the closer of the two if both are set, or the next + // match to the one that is. + if (!isCurrent(dayOfWeek, currentDayOfWeek) || !isCurrent(dayOfMonth, currentDayOfMonth)) { + + int nextWeekDay = Integer.MAX_VALUE; + int nextCalendarDay = Integer.MAX_VALUE; + + if (!isCurrent(dayOfWeek, currentDayOfWeek)) { + nextWeekDay = getNext(dayOfWeek, currentDayOfWeek); + } + + if (!isCurrent(dayOfMonth, currentDayOfMonth)) { + nextCalendarDay = getNext(dayOfMonth, currentDayOfMonth); + } + + if (nextWeekDay < nextCalendarDay) { + working.add(Calendar.DAY_OF_WEEK, nextWeekDay); + } else { + working.add(Calendar.DAY_OF_MONTH, nextCalendarDay); + } + + // Since the day changed, we restart the clock at the start of the day + // so that the next time will either be at 12am + value of hours and + // minutes pattern. + resetToStartOfDay(working, working.get(Calendar.DAY_OF_MONTH)); + + return working.getTimeInMillis(); + } + + return 0L; + } + + public static void validate(final String cronEntry) throws MessageFormatException { + List list = tokenize(cronEntry); + List entries = buildCronEntries(list); + for (CronEntry e : entries) { + validate(e); + } + } + + static void validate(final CronEntry entry) throws MessageFormatException { + List list = entry.currentWhen; + if (list.isEmpty() || list.get(0).intValue() < entry.start || list.get(list.size() - 1).intValue() > entry.end) { + throw new MessageFormatException("Invalid token: " + entry); + } + } + + static int getNext(final CronEntry entry, final int current) throws MessageFormatException { + int result = 0; + + if (entry.currentWhen == null) { + entry.currentWhen = calculateValues(entry); + } + + List list = entry.currentWhen; + int next = -1; + for (Integer i : list) { + if (i.intValue() > current) { + next = i.intValue(); + break; + } + } + if (next != -1) { + result = next - current; + } else { + int first = list.get(0).intValue(); + result = entry.end + first - entry.start - current; + + // Account for difference of one vs zero based indices. + if (entry.name.equals("DayOfWeek") || entry.name.equals("Month")) { + result++; + } + } + + return result; + } + + static boolean isCurrent(final CronEntry entry, final int current) throws MessageFormatException { + boolean result = entry.currentWhen.contains(new Integer(current)); + return result; + } + + protected static void resetToStartOfDay(Calendar target, int day) { + target.set(Calendar.DAY_OF_MONTH, day); + target.set(Calendar.HOUR_OF_DAY, 0); + target.set(Calendar.MINUTE, 0); + target.set(Calendar.SECOND, 0); + } + + static List tokenize(String cron) throws IllegalArgumentException { + StringTokenizer tokenize = new StringTokenizer(cron); + List result = new ArrayList(); + while (tokenize.hasMoreTokens()) { + result.add(tokenize.nextToken()); + } + if (result.size() != NUMBER_TOKENS) { + throw new IllegalArgumentException("Not a valid cron entry - wrong number of tokens(" + result.size() + "): " + cron); + } + return result; + } + + protected static List calculateValues(final CronEntry entry) { + List result = new ArrayList(); + if (isAll(entry.token)) { + for (int i = entry.start; i <= entry.end; i++) { + result.add(i); + } + } else if (isAStep(entry.token)) { + int denominator = getDenominator(entry.token); + String numerator = getNumerator(entry.token); + CronEntry ce = new CronEntry(entry.name, numerator, entry.start, entry.end); + List list = calculateValues(ce); + for (Integer i : list) { + if (i.intValue() % denominator == 0) { + result.add(i); + } + } + } else if (isAList(entry.token)) { + StringTokenizer tokenizer = new StringTokenizer(entry.token, ","); + while (tokenizer.hasMoreTokens()) { + String str = tokenizer.nextToken(); + CronEntry ce = new CronEntry(entry.name, str, entry.start, entry.end); + List list = calculateValues(ce); + result.addAll(list); + } + } else if (isARange(entry.token)) { + int index = entry.token.indexOf('-'); + int first = Integer.parseInt(entry.token.substring(0, index)); + int last = Integer.parseInt(entry.token.substring(index + 1)); + for (int i = first; i <= last; i++) { + result.add(i); + } + } else { + int value = Integer.parseInt(entry.token); + result.add(value); + } + Collections.sort(result); + return result; + } + + protected static boolean isARange(String token) { + return token != null && token.indexOf('-') >= 0; + } + + protected static boolean isAStep(String token) { + return token != null && token.indexOf('/') >= 0; + } + + protected static boolean isAList(String token) { + return token != null && token.indexOf(',') >= 0; + } + + protected static boolean isAll(String token) { + return token != null && token.length() == 1 && (token.charAt(0) == '*' || token.charAt(0) == '?'); + } + + protected static int getDenominator(final String token) { + int result = 0; + int index = token.indexOf('/'); + String str = token.substring(index + 1); + result = Integer.parseInt(str); + return result; + } + + protected static String getNumerator(final String token) { + int index = token.indexOf('/'); + String str = token.substring(0, index); + return str; + } + + static List buildCronEntries(List tokens) { + + List result = new ArrayList(); + + CronEntry minutes = new CronEntry("Minutes", tokens.get(MINUTES), 0, 60); + minutes.currentWhen = calculateValues(minutes); + result.add(minutes); + CronEntry hours = new CronEntry("Hours", tokens.get(HOURS), 0, 24); + hours.currentWhen = calculateValues(hours); + result.add(hours); + CronEntry dayOfMonth = new CronEntry("DayOfMonth", tokens.get(DAY_OF_MONTH), 1, 31); + dayOfMonth.currentWhen = calculateValues(dayOfMonth); + result.add(dayOfMonth); + CronEntry month = new CronEntry("Month", tokens.get(MONTH), 1, 12); + month.currentWhen = calculateValues(month); + result.add(month); + CronEntry dayOfWeek = new CronEntry("DayOfWeek", tokens.get(DAY_OF_WEEK), 0, 6); + dayOfWeek.currentWhen = calculateValues(dayOfWeek); + result.add(dayOfWeek); + + return result; + } + + static class CronEntry { + + final String name; + final String token; + final int start; + final int end; + + List currentWhen; + + CronEntry(String name, String token, int start, int end) { + this.name = name; + this.token = token; + this.start = start; + this.end = end; + } + + @Override + public String toString() { + return this.name + ":" + token; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-openwire/blob/7ae454fa/openwire-core/src/main/java/org/apache/activemq/openwire/utils/DefaultUnresolvedDestinationTransformer.java ---------------------------------------------------------------------- diff --git a/openwire-core/src/main/java/org/apache/activemq/openwire/utils/DefaultUnresolvedDestinationTransformer.java b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/DefaultUnresolvedDestinationTransformer.java new file mode 100644 index 0000000..e7c6bec --- /dev/null +++ b/openwire-core/src/main/java/org/apache/activemq/openwire/utils/DefaultUnresolvedDestinationTransformer.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.openwire.utils; + +import java.lang.reflect.Method; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.Topic; + +import org.apache.activemq.openwire.commands.OpenWireDestination; +import org.apache.activemq.openwire.commands.OpenWireQueue; +import org.apache.activemq.openwire.commands.OpenWireTopic; + +/** + * A default implementation of the resolver that attempts to find an isQueue or isTopic method + * on the foreign destination to determine the correct type. + */ +public class DefaultUnresolvedDestinationTransformer implements UnresolvedDestinationTransformer { + + @Override + public OpenWireDestination transform(Destination dest) throws JMSException { + String queueName = ((Queue) dest).getQueueName(); + String topicName = ((Topic) dest).getTopicName(); + + if (queueName == null && topicName == null) { + throw new JMSException("Unresolvable destination: Both queue and topic names are null: " + dest); + } + + try { + Method isQueueMethod = dest.getClass().getMethod("isQueue"); + Method isTopicMethod = dest.getClass().getMethod("isTopic"); + + if (isQueueMethod == null && isTopicMethod == null) { + throw new JMSException("Unresolvable destination: Neither isQueue nor isTopic methods present: " + dest); + } + + Boolean isQueue = (Boolean) isQueueMethod.invoke(dest); + Boolean isTopic = (Boolean) isTopicMethod.invoke(dest); + if (isQueue) { + return new OpenWireQueue(queueName); + } else if (isTopic) { + return new OpenWireTopic(topicName); + } else { + throw new JMSException("Unresolvable destination: Neither Queue nor Topic: " + dest); + } + } catch (Exception e) { + throw new JMSException("Unresolvable destination: " + e.getMessage() + ": " + dest); + } + } + + @Override + public OpenWireDestination transform(String dest) throws JMSException { + return new OpenWireQueue(dest); + } +}