activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1158416 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/ope...
Date Tue, 16 Aug 2011 18:39:26 GMT
Author: chirino
Date: Tue Aug 16 18:39:25 2011
New Revision: 1158416

URL: http://svn.apache.org/viewvc?rev=1158416&view=rev
Log:
Starting to implement temp destination support.
Removed un-needed classes.
Moved all the open wire destination to apollo destination conversions to one spot.

Added:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
Removed:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenWireMessageDelivery.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireMessageEvaluationContext.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/openwire/OpenwireProtocolHandler.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/Destination.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/broker/region/MessageReference.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitorAdapter.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionState.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConnectionStateTracker.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ConsumerState.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/ProducerState.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/SessionState.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/Tracked.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/TransactionState.java
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1158416&r1=1158415&r2=1158416&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Tue Aug 16 18:39:25 2011
@@ -92,9 +92,6 @@ object LocalRouter extends Log {
   val QUEUE_DOMAIN = "queue"
   val DSUB_DOMAIN = "dsub"
 
-  val TEMP_TOPIC_DOMAIN = "temp-topic"
-  val TEMP_QUEUE_DOMAIN = "temp-queue"
-
   val QUEUE_KIND = "queue"
   val DEFAULT_QUEUE_PATH = "default"
 

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala?rev=1158416&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
Tue Aug 16 18:39:25 2011
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire
+
+import command._
+import org.apache.activemq.apollo.util.path.Path
+import java.util.regex.{Matcher, Pattern}
+import org.apache.activemq.apollo.dto.{TopicDestinationDTO, QueueDestinationDTO, DestinationDTO}
+import org.apache.activemq.apollo.broker.{DestinationParser, LocalRouter}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object DestinationConverter {
+
+  val OPENWIRE_PARSER = new DestinationParser();
+  OPENWIRE_PARSER.path_separator = "."
+  OPENWIRE_PARSER.any_child_wildcard = "*"
+  OPENWIRE_PARSER.any_descendant_wildcard = ">"
+
+  def to_destination_dto(domain: String, parts:Array[String]): DestinationDTO = domain match
{
+    case LocalRouter.QUEUE_DOMAIN => new QueueDestinationDTO(parts)
+    case LocalRouter.TOPIC_DOMAIN => new TopicDestinationDTO(parts)
+    case _ => throw new Exception("Uknown destination domain: " + domain);
+  }
+
+  def to_destination_dto(domain: String, path: Path): Array[DestinationDTO] = {
+      Array(to_destination_dto(domain, OPENWIRE_PARSER.path_parts(path)))
+  }
+
+  def to_destination_dto(dest: ActiveMQDestination): Array[DestinationDTO] = {
+    if( !dest.isComposite ) {
+      import ActiveMQDestination._
+      val physicalName = dest.getPhysicalName.replaceAll(Pattern.quote(":"), Matcher.quoteReplacement("%58"))
+
+      var path = OPENWIRE_PARSER.decode_path(physicalName)
+      dest.getDestinationType match {
+        case QUEUE_TYPE =>
+          to_destination_dto(LocalRouter.QUEUE_DOMAIN, path)
+        case TOPIC_TYPE =>
+          to_destination_dto(LocalRouter.TOPIC_DOMAIN, path)
+        case TEMP_QUEUE_TYPE =>
+          to_destination_dto(LocalRouter.QUEUE_DOMAIN, Path("ActiveMQ", "Temp") + path)
+        case TEMP_TOPIC_TYPE =>
+          to_destination_dto(LocalRouter.TOPIC_DOMAIN, Path("ActiveMQ", "Temp") + path)
+      }
+    } else {
+      dest.getCompositeDestinations.map { c =>
+        to_destination_dto(c)(0)
+      }
+    }
+  }
+
+  def to_activemq_destination(dest:Array[DestinationDTO]):ActiveMQDestination = {
+    import collection.JavaConversions._
+
+    val rc = dest.map { dest =>
+      var temp = dest.temp_owner != null
+      val name = OPENWIRE_PARSER.encode_path(asScalaBuffer(dest.path).toList match {
+        case "ActiveMQ" :: "Temp" :: rest =>
+          temp = true
+          rest
+        case rest =>
+          rest
+      }).replaceAll(Pattern.quote("%58"), Matcher.quoteReplacement(":"))
+
+      dest match {
+        case dest:QueueDestinationDTO =>
+          if( temp ) {
+            new ActiveMQTempQueue(name)
+          } else {
+            new ActiveMQQueue(name)
+          }
+        case dest:TopicDestinationDTO =>
+          if( temp ) {
+            new ActiveMQTempTopic(name)
+          } else {
+            new ActiveMQTopic(name)
+          }
+      }
+    }
+
+    if( rc.length == 1) {
+      rc(0)
+    } else {
+      val c = new ActiveMQQueue()
+      c.setCompositeDestinations(rc)
+      c
+    }
+
+  }
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala?rev=1158416&r1=1158415&r2=1158416&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
Tue Aug 16 18:39:25 2011
@@ -51,8 +51,6 @@ class OpenwireMessage(val message:Active
 
   def expiration = message.getExpiration
 
-  def destination = message.getDestination.toDestination
-
   def getBodyAs[T](toType : Class[T]) = {
     (message match {
       case x:ActiveMQTextMessage =>

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1158416&r1=1158415&r2=1158416&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Tue Aug 16 18:39:25 2011
@@ -36,13 +36,16 @@ import java.util.Map.Entry
 import protocol._
 import scala.util.continuations._
 import security.SecurityContext
+import support.advisory.AdvisorySupport
 import tcp.TcpTransport
 import codec.OpenWireFormat
 import command._
 import org.apache.activemq.apollo.openwire.dto.{OpenwireConnectionStatusDTO,OpenwireDTO}
 import org.apache.activemq.apollo.dto.{AcceptingConnectorDTO, TopicDestinationDTO, DurableSubscriptionDestinationDTO,
DestinationDTO}
+import org.apache.activemq.apollo.openwire.DestinationConverter._
 
 object OpenwireProtocolHandler extends Log {
+  def unit:Unit = {}
 
   val DEFAULT_DIE_DELAY = 5 * 1000L
   var die_delay = DEFAULT_DIE_DELAY
@@ -175,6 +178,7 @@ class OpenwireProtocolHandler extends Pr
   }
 
   override def on_transport_connected():Unit = {
+    security_context.connection_id = Some(connection.id)
     security_context.local_address = connection.transport.getLocalAddress
     security_context.remote_address = connection.transport.getRemoteAddress
 
@@ -259,19 +263,19 @@ class OpenwireProtocolHandler extends Pr
           case info:KeepAliveInfo=> ack(info)
           case info:ShutdownInfo=> ack(info); connection.stop
           case info:FlushCommand=> ack(info)
+          case info:DestinationInfo=> on_destination_info(info)
 
           // case info:ConnectionControl=>
           // case info:ConnectionError=>
           // case info:ConsumerControl=>
-          // case info:DestinationInfo=>
           // case info:RemoveSubscriptionInfo=>
           // case info:ControlCommand=>
 
           ///////////////////////////////////////////////////////////////////
           // Methods for cluster operations
           // These commands are sent to the broker when it's acting like a
-          //client to another broker.
-          // /////////////////////////////////////////////////////////////////
+          // client to another broker.
+          ///////////////////////////////////////////////////////////////////
           // case info:BrokerInfo=>
           // case info:MessageDispatch=>
           // case info:MessageDispatchNotification=>
@@ -476,6 +480,27 @@ class OpenwireProtocolHandler extends Pr
     }
   }
 
+  def on_destination_info(info:DestinationInfo) = {
+    val destinations = to_destination_dto(info.getDestination)
+    if( info.getDestination.isTemporary ) {
+      destinations.foreach(_.temp_owner = connection.id)
+    }
+    reset{
+      val rc = info.getOperationType match {
+        case DestinationInfo.ADD_OPERATION_TYPE=>
+          host.router.create(destinations, security_context)
+        case DestinationInfo.REMOVE_OPERATION_TYPE=>
+          host.router.delete(destinations, security_context)
+      }
+      rc match {
+        case None =>
+          ack(info)
+        case Some(error)=>
+          ack(info)
+      }
+    }
+  }
+
   def on_remove_info(info: RemoveInfo) = {
     info.getObjectId match {
       case id: ConnectionId => all_connections.get(id).foreach(_.dettach)
@@ -544,7 +569,7 @@ class OpenwireProtocolHandler extends Pr
 
   def perform_send(msg:ActiveMQMessage, uow:StoreUOW=null): Unit = {
 
-    val destiantion = msg.getDestination.toDestination
+    val destiantion = to_destination_dto(msg.getDestination)
     val key = destiantion.toList
     producerRoutes.get(key) match {
       case null =>
@@ -728,8 +753,14 @@ class OpenwireProtocolHandler extends Pr
     def attach = {
 
       if( info.getDestination == null ) fail("destination was not set")
+      destination = to_destination_dto(info.getDestination)
+
+      // if they are temp dests.. attach our owner id so that we don't
+      // get rejected.
+      if( info.getDestination.isTemporary ) {
+        destination.foreach(_.temp_owner = connection.get.id)
+      }
 
-      destination = info.getDestination.toDestination
       parent.consumers.put(info.getConsumerId, this)
       all_consumers.put(info.getConsumerId, this)
       var is_durable_sub = info.getSubscriptionName!=null
@@ -843,52 +874,20 @@ class OpenwireProtocolHandler extends Pr
           }
         }
       }
-//      def close = {
-//
-//        assert(producer.dispatch_queue.isExecuting)
-//        if( !closed ) {
-//          closed = true
-//          if( browser ) {
-//            // Then send the end of browse message.
-//            var dispatch = new MessageDispatch
-//            dispatch.setConsumerId(this.consumer.info.getConsumerId)
-//            dispatch.setMessage(null)
-//            dispatch.setDestination(null)
-//
-//            if( downstream.full ) {
-//              // session is full so use an overflow sink so to hold the message,
-//              // and then trigger closing the session once it empties out.
-//              val sink = new OverflowSink(downstream)
-//              sink.refiller = ^{
-//                outbound_sessions.close(downstream)
-//                release
-//              }
-//              sink.offer(dispatch)
-//            } else {
-//              downstream.offer(dispatch)
-//              outbound_sessions.close(downstream)
-//              release
-//            }
-//          } else {
-//            outbound_sessions.close(downstream)
-//            release
-//          }
-//        }
-//      }
 
       def dispose = {
         session_manager.close(downstream)
-//        if( auto_delete ) {
-//          reset {
-//            val rc = host.router.delete(destination, security_context)
-//            rc match {
-//              case Some(error) =>
-//                async_die(error)
-//              case None =>
-//                unit
-//            }
-//          }
-//        }
+        if( info.getDestination.isTemporary ) {
+          reset {
+            val rc = host.router.delete(destination, security_context)
+            rc match {
+              case Some(error) =>
+                async_die(error)
+              case None =>
+                unit
+            }
+          }
+        }
         release
       }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java?rev=1158416&r1=1158415&r2=1158416&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java
Tue Aug 16 18:39:25 2011
@@ -16,13 +16,7 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import java.util.ArrayList;
-
-import org.apache.activemq.apollo.broker.DestinationParser;
-import org.apache.activemq.apollo.broker.LocalRouter;
-import org.apache.activemq.apollo.dto.DestinationDTO;
-import org.apache.activemq.apollo.util.path.Path;
-import org.fusesource.hawtbuf.AsciiBuffer;
+import java.util.*;
 
 /**
  * @openwire:marshaller
@@ -30,204 +24,256 @@ import org.fusesource.hawtbuf.AsciiBuffe
  */
 abstract public class ActiveMQDestination implements DataStructure, Comparable {
 
-    public static final String PATH_SEPERATOR = ".";
-    public static final char COMPOSITE_SEPERATOR = ',';
-
+  public static final String PATH_SEPERATOR = ".";
+  public static final char COMPOSITE_SEPERATOR = ',';
 
-    public static final DestinationParser PARSER = DestinationParser.OPENWIRE_PARSER();
+  public static final byte QUEUE_TYPE = 0x01;
+  public static final byte TOPIC_TYPE = 0x02;
+  public static final byte TEMP_MASK = 0x04;
+  public static final byte TEMP_TOPIC_TYPE = TOPIC_TYPE | TEMP_MASK;
+  public static final byte TEMP_QUEUE_TYPE = QUEUE_TYPE | TEMP_MASK;
+
+  public static final String QUEUE_QUALIFIED_PREFIX = "queue://";
+  public static final String TOPIC_QUALIFIED_PREFIX = "topic://";
+  public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://";
+  public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://";
+
+  public static final String TEMP_DESTINATION_NAME_PREFIX = "ID:";
+
+  private static final long serialVersionUID = -3885260014960795889L;
+
+  protected String physicalName;
+
+  protected transient ActiveMQDestination[] compositeDestinations;
+  protected transient String[] destinationPaths;
+
+  public ActiveMQDestination() {
+  }
+
+  protected ActiveMQDestination(String name) {
+      setPhysicalName(name);
+  }
+
+  public ActiveMQDestination(ActiveMQDestination composites[]) {
+      setCompositeDestinations(composites);
+  }
+
+
+  // static helper methods for working with destinations
+  // -------------------------------------------------------------------------
+  public static ActiveMQDestination createDestination(String name, byte defaultType) {
+
+      if (name.startsWith(QUEUE_QUALIFIED_PREFIX)) {
+          return new ActiveMQQueue(name.substring(QUEUE_QUALIFIED_PREFIX.length()));
+      } else if (name.startsWith(TOPIC_QUALIFIED_PREFIX)) {
+          return new ActiveMQTopic(name.substring(TOPIC_QUALIFIED_PREFIX.length()));
+      } else if (name.startsWith(TEMP_QUEUE_QUALIFED_PREFIX)) {
+          return new ActiveMQTempQueue(name.substring(TEMP_QUEUE_QUALIFED_PREFIX.length()));
+      } else if (name.startsWith(TEMP_TOPIC_QUALIFED_PREFIX)) {
+          return new ActiveMQTempTopic(name.substring(TEMP_TOPIC_QUALIFED_PREFIX.length()));
+      }
+
+      switch (defaultType) {
+      case QUEUE_TYPE:
+          return new ActiveMQQueue(name);
+      case TOPIC_TYPE:
+          return new ActiveMQTopic(name);
+      case TEMP_QUEUE_TYPE:
+          return new ActiveMQTempQueue(name);
+      case TEMP_TOPIC_TYPE:
+          return new ActiveMQTempTopic(name);
+      default:
+          throw new IllegalArgumentException("Invalid default destination type: " + defaultType);
+      }
+  }
+
+  public static int compare(ActiveMQDestination destination, ActiveMQDestination destination2)
{
+      if (destination == destination2) {
+          return 0;
+      }
+      if (destination == null) {
+          return -1;
+      } else if (destination2 == null) {
+          return 1;
+      } else {
+          if (destination.isQueue() == destination2.isQueue()) {
+              return destination.getPhysicalName().compareTo(destination2.getPhysicalName());
+          } else {
+              return destination.isQueue() ? -1 : 1;
+          }
+      }
+  }
+
+  public int compareTo(Object that) {
+      if (that instanceof ActiveMQDestination) {
+          return compare(this, (ActiveMQDestination)that);
+      }
+      if (that == null) {
+          return 1;
+      } else {
+          return getClass().getName().compareTo(that.getClass().getName());
+      }
+  }
+
+  public boolean isComposite() {
+      return compositeDestinations != null;
+  }
+
+  public ActiveMQDestination[] getCompositeDestinations() {
+      return compositeDestinations;
+  }
+
+  public void setCompositeDestinations(ActiveMQDestination[] destinations) {
+      this.compositeDestinations = destinations;
+      this.destinationPaths = null;
+
+      StringBuffer sb = new StringBuffer();
+      for (int i = 0; i < destinations.length; i++) {
+          if (i != 0) {
+              sb.append(COMPOSITE_SEPERATOR);
+          }
+          if (getDestinationType() == destinations[i].getDestinationType()) {
+              sb.append(destinations[i].getPhysicalName());
+          } else {
+              sb.append(destinations[i].getQualifiedName());
+          }
+      }
+      physicalName = sb.toString();
+  }
+
+  public String getQualifiedName() {
+      if (isComposite()) {
+          return physicalName;
+      }
+      return getQualifiedPrefix() + physicalName;
+  }
+
+  protected abstract String getQualifiedPrefix();
+
+  /**
+   * @openwire:property version=1
+   */
+  public String getPhysicalName() {
+      return physicalName;
+  }
+
+  public void setPhysicalName(String physicalName) {
+      final int len = physicalName.length();
+      // options offset
+      int p = -1;
+      boolean composite = false;
+      for (int i = 0; i < len; i++) {
+          char c = physicalName.charAt(i);
+          if (c == '?') {
+              p = i;
+              break;
+          }
+          if (c == COMPOSITE_SEPERATOR) {
+              // won't be wild card
+              composite = true;
+          }
+      }
+      // Strip off any options
+      if (p >= 0) {
+          String optstring = physicalName.substring(p + 1);
+          physicalName = physicalName.substring(0, p);
+      }
+      this.physicalName = physicalName;
+      this.destinationPaths = null;
+      if (composite) {
+          // Check to see if it is a composite.
+          Set<String> l = new HashSet<String>();
+          StringTokenizer iter = new StringTokenizer(physicalName, "" + COMPOSITE_SEPERATOR);
+          while (iter.hasMoreTokens()) {
+              String name = iter.nextToken().trim();
+              if (name.length() == 0) {
+                  continue;
+              }
+              l.add(name);
+          }
+          compositeDestinations = new ActiveMQDestination[l.size()];
+          int counter = 0;
+          for (String dest : l) {
+              compositeDestinations[counter++] = createDestination(dest);
+          }
+      }
+  }
+
+  public ActiveMQDestination createDestination(String name) {
+      return createDestination(name, getDestinationType());
+  }
+
+  public String[] getDestinationPaths() {
+
+      if (destinationPaths != null) {
+          return destinationPaths;
+      }
+
+      List<String> l = new ArrayList<String>();
+      StringTokenizer iter = new StringTokenizer(physicalName, PATH_SEPERATOR);
+      while (iter.hasMoreTokens()) {
+          String name = iter.nextToken().trim();
+          if (name.length() == 0) {
+              continue;
+          }
+          l.add(name);
+      }
+
+      destinationPaths = new String[l.size()];
+      l.toArray(destinationPaths);
+      return destinationPaths;
+  }
+
+  public abstract byte getDestinationType();
+
+  public boolean isQueue() {
+      return false;
+  }
+
+  public boolean isTopic() {
+      return false;
+  }
+
+  public boolean isTemporary() {
+      return false;
+  }
+
+  public boolean equals(Object o) {
+      if (this == o) {
+          return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+          return false;
+      }
+      ActiveMQDestination d = (ActiveMQDestination)o;
+      return physicalName.equals(d.physicalName);
+  }
+
+  public int hashCode() {
+      return physicalName.hashCode();
+  }
+
+  public String toString() {
+      return getQualifiedName();
+  }
+
+  public String getDestinationTypeAsString() {
+      switch (getDestinationType()) {
+      case QUEUE_TYPE:
+          return "Queue";
+      case TOPIC_TYPE:
+          return "Topic";
+      case TEMP_QUEUE_TYPE:
+          return "TempQueue";
+      case TEMP_TOPIC_TYPE:
+          return "TempTopic";
+      default:
+          throw new IllegalArgumentException("Invalid destination type: " + getDestinationType());
+      }
+  }
+
+  public boolean isMarshallAware() {
+      return false;
+  }
 
-    public static final byte QUEUE_TYPE = 0x01;
-    public static final byte TOPIC_TYPE = 0x02;
-    public static final byte TEMP_MASK = 0x04;
-    public static final byte TEMP_TOPIC_TYPE = TOPIC_TYPE | TEMP_MASK;
-    public static final byte TEMP_QUEUE_TYPE = QUEUE_TYPE | TEMP_MASK;
-
-    public static final String QUEUE_QUALIFIED_PREFIX = "queue://";
-    public static final String TOPIC_QUALIFIED_PREFIX = "topic://";
-    public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://";
-    public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://";
-
-    public static final String TEMP_DESTINATION_NAME_PREFIX = "ID:";
-
-    private static final long serialVersionUID = -3885260014960795889L;
-
-    protected String physicalName;
-
-    protected transient DestinationDTO[] destination;
-
-    public ActiveMQDestination() {
-    }
-
-    protected ActiveMQDestination(String name) {
-        setPhysicalName(name);
-    }
-
-
-    public DestinationDTO[] toDestination() {
-        return destination;
-    }
-
-    // static helper methods for working with destinations
-    // -------------------------------------------------------------------------
-    public static ActiveMQDestination createDestination(String name, byte defaultType) {
-
-        if (name.startsWith(QUEUE_QUALIFIED_PREFIX)) {
-            return new ActiveMQQueue(name.substring(QUEUE_QUALIFIED_PREFIX.length()));
-        } else if (name.startsWith(TOPIC_QUALIFIED_PREFIX)) {
-            return new ActiveMQTopic(name.substring(TOPIC_QUALIFIED_PREFIX.length()));
-        } else if (name.startsWith(TEMP_QUEUE_QUALIFED_PREFIX)) {
-            return new ActiveMQTempQueue(name.substring(TEMP_QUEUE_QUALIFED_PREFIX.length()));
-        } else if (name.startsWith(TEMP_TOPIC_QUALIFED_PREFIX)) {
-            return new ActiveMQTempTopic(name.substring(TEMP_TOPIC_QUALIFED_PREFIX.length()));
-        }
-
-        switch (defaultType) {
-        case QUEUE_TYPE:
-            return new ActiveMQQueue(name);
-        case TOPIC_TYPE:
-            return new ActiveMQTopic(name);
-        case TEMP_QUEUE_TYPE:
-            return new ActiveMQTempQueue(name);
-        case TEMP_TOPIC_TYPE:
-            return new ActiveMQTempTopic(name);
-        default:
-            throw new IllegalArgumentException("Invalid default destination type: " + defaultType);
-        }
-    }
-
-    public static int compare(ActiveMQDestination destination, ActiveMQDestination destination2)
{
-        if (destination == destination2) {
-            return 0;
-        }
-        if (destination == null) {
-            return -1;
-        } else if (destination2 == null) {
-            return 1;
-        } else {
-            if (destination.isQueue() == destination2.isQueue()) {
-                return destination.getPhysicalName().compareTo(destination2.getPhysicalName());
-            } else {
-                return destination.isQueue() ? -1 : 1;
-            }
-        }
-    }
-
-    public int compareTo(Object that) {
-        if (that instanceof ActiveMQDestination) {
-            return compare(this, (ActiveMQDestination)that);
-        }
-        if (that == null) {
-            return 1;
-        } else {
-            return getClass().getName().compareTo(that.getClass().getName());
-        }
-    }
-
-
-    protected abstract String getQualifiedPrefix();
-
-    /**
-     * @openwire:property version=1
-     */
-    public String getPhysicalName() {
-        return physicalName;
-    }
-
-    DestinationDTO[] create_destination(String domain, Path path) {
-        return new DestinationDTO[] { PARSER.create_destination(domain, PARSER.path_parts(path))
};
-    }
-
-    public void setPhysicalName(String value) {
-        physicalName = value;
-        String[] composites = value.split(",");
-        if(composites.length == 1) {
-            Path path = PARSER.decode_path(composites[0]);
-            switch(getDestinationType()) {
-                case QUEUE_TYPE:
-                    destination = create_destination(LocalRouter.QUEUE_DOMAIN(), path);
-                    break;
-                case TOPIC_TYPE:
-                    destination = create_destination(LocalRouter.TOPIC_DOMAIN(), path);
-                    break;
-                case TEMP_QUEUE_TYPE:
-                    destination = create_destination(LocalRouter.TEMP_QUEUE_DOMAIN(), path);
-                    break;
-                case TEMP_TOPIC_TYPE:
-                    destination = create_destination(LocalRouter.TEMP_TOPIC_DOMAIN(), path);
-                    break;
-            }
-        } else {
-            ArrayList<DestinationDTO> l = new ArrayList<DestinationDTO>();
-            for( String c:composites ) {
-                l.add(createDestination(c).destination[0]);
-            }
-            destination = l.toArray(new DestinationDTO[l.size()]);
-        }
-
-    }
-
-    public ActiveMQDestination createDestination(String name) {
-        return createDestination(name, getDestinationType());
-    }
-
-    public abstract byte getDestinationType();
-
-    public boolean isQueue() {
-        return false;
-    }
-
-    public boolean isTopic() {
-        return false;
-    }
-
-    public boolean isTemporary() {
-        return false;
-    }
-
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        ActiveMQDestination d = (ActiveMQDestination)o;
-        return destination.equals(d.destination);
-    }
-
-    public int hashCode() {
-        return destination.hashCode();
-    }
-
-    public String toString() {
-        return destination.toString();
-    }
-
-    public String getDestinationTypeAsString() {
-        switch (getDestinationType()) {
-        case QUEUE_TYPE:
-            return "Queue";
-        case TOPIC_TYPE:
-            return "Topic";
-        case TEMP_QUEUE_TYPE:
-            return "TempQueue";
-        case TEMP_TOPIC_TYPE:
-            return "TempTopic";
-        default:
-            throw new IllegalArgumentException("Invalid destination type: " + getDestinationType());
-        }
-    }
-
-    public boolean isMarshallAware() {
-        return false;
-    }
-
-    public boolean isComposite() {
-        throw new UnsupportedOperationException();
-    }
-
-    public ActiveMQDestination[] getCompositeDestinations() {
-        throw new UnsupportedOperationException();
-    }
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java?rev=1158416&r1=1158415&r2=1158416&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java
Tue Aug 16 18:39:25 2011
@@ -28,7 +28,6 @@ import org.apache.activemq.apollo.openwi
 import org.apache.activemq.apollo.openwire.support.advisory.AdvisorySupport;
 import org.apache.activemq.apollo.filter.FilterException;
 import org.apache.activemq.apollo.filter.Filterable;
-import org.apache.activemq.apollo.openwire.support.broker.region.MessageReference;
 import org.apache.activemq.apollo.openwire.codec.OpenWireFormat;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.ByteArrayInputStream;
@@ -39,7 +38,7 @@ import org.fusesource.hawtbuf.ByteArrayO
  * 
  * @openwire:marshaller
  */
-public abstract class Message extends BaseCommand implements MarshallAware, MessageReference
{
+public abstract class Message extends BaseCommand implements MarshallAware {
 
     /**
      * The default minimum amount of memory a message is assumed to use

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.scala?rev=1158416&r1=1158415&r2=1158416&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/Path.scala
Tue Aug 16 18:39:25 2011
@@ -16,6 +16,10 @@
   */
 package org.apache.activemq.apollo.util.path
 
+object Path {
+  def apply(value:String*):Path = Path(value.toList.map(LiteralPart(_)))
+}
+
 /**
   * <p>
   * </p>
@@ -32,4 +36,6 @@ case class Path(parts: List[Part]) {
     return pp.encode_path(this)
   }
 
+  def +(other:Path) = Path(parts ::: other.parts)
+
 }
\ No newline at end of file



Mime
View raw message