activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [20/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:00:50 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupHandlingAbstract.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupHandlingAbstract.java
b/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupHandlingAbstract.java
index eeb5a49..83d72cb 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupHandlingAbstract.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupHandlingAbstract.java
@@ -19,8 +19,8 @@ import java.util.WeakHashMap;
 import java.util.concurrent.Executor;
 
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.api.core.management.ManagementHelper;
-import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.postoffice.BindingType;
 import org.hornetq.core.server.HornetQServerLogger;
 import org.hornetq.core.server.group.GroupingHandler;
@@ -110,7 +110,7 @@ public abstract class GroupHandlingAbstract implements GroupingHandler
       props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
       props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address);
       props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
-      Notification notification = new Notification(null, NotificationType.UNPROPOSAL, props);
+      Notification notification = new Notification(null, CoreNotificationType.UNPROPOSAL,
props);
       try
       {
          managementService.sendNotification(notification);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java
b/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java
index 5f4ce80..8851259 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java
@@ -14,6 +14,7 @@ package org.hornetq.core.server.group.impl;
 
 import java.io.Serializable;
 
+import org.hornetq.api.config.HornetQDefaultConfiguration;
 import org.hornetq.api.core.SimpleString;
 
 /**
@@ -25,75 +26,24 @@ public final class GroupingHandlerConfiguration implements Serializable
 {
    public static final long serialVersionUID = -4600283023652477326L;
 
-   public static final int DEFAULT_TIMEOUT = 5000;
-
-   public static final long DEFAULT_GROUP_TIMEOUT = -1;
-
-   public static final long DEFAULT_REAPER_PERIOD = 30000;
-
    public static final String GROUP_TIMEOUT_PROP_NAME = "org.hornetq.GroupingHandlerConfiguration.groupTimeout";
 
    public static final String REAPER_PERIOD_PROP_NAME = "org.hornetq.GroupingHandlerConfiguration.reaperPeriod";
 
-   private final SimpleString name;
-
-   private final TYPE type;
-
-   private final SimpleString address;
-
-   private final long timeout;
+   private SimpleString name = null;
 
-   private long groupTimeout;
+   private TYPE type = null;
 
-   private final long reaperPeriod;
+   private SimpleString address = null;
 
+   private long timeout = HornetQDefaultConfiguration.getDefaultGroupingHandlerTimeout();
 
-   public GroupingHandlerConfiguration(final SimpleString name, final TYPE type, final SimpleString
address)
-   {
-      this(name, type, address,
-            GroupingHandlerConfiguration.DEFAULT_TIMEOUT,
-            GroupingHandlerConfiguration.DEFAULT_GROUP_TIMEOUT,
-            GroupingHandlerConfiguration.DEFAULT_REAPER_PERIOD);
-   }
+   private long groupTimeout = HornetQDefaultConfiguration.getDefaultGroupingHandlerGroupTimeout();
 
-   public GroupingHandlerConfiguration(final SimpleString name,
-                                       final TYPE type,
-                                       final SimpleString address,
-                                       final int timeout)
-   {
-      this(name, type, address, timeout,
-            GroupingHandlerConfiguration.DEFAULT_GROUP_TIMEOUT,
-            GroupingHandlerConfiguration.DEFAULT_REAPER_PERIOD);
-   }
+   private long reaperPeriod = HornetQDefaultConfiguration.getDefaultGroupingHandlerReaperPeriod();
 
-   public GroupingHandlerConfiguration(final SimpleString name,
-                                       final TYPE type,
-                                       final SimpleString address,
-                                       final int timeout,
-                                       final long groupTimeout,
-                                       final long reaperPeriod)
+   public GroupingHandlerConfiguration()
    {
-      this.type = type;
-      this.name = name;
-      this.address = address;
-      this.timeout = timeout;
-      if (System.getProperty(GROUP_TIMEOUT_PROP_NAME) != null)
-      {
-         this.groupTimeout = Long.parseLong(System.getProperty(GROUP_TIMEOUT_PROP_NAME));
-      }
-      else
-      {
-         this.groupTimeout = groupTimeout;
-      }
-
-      if (System.getProperty(REAPER_PERIOD_PROP_NAME) != null)
-      {
-         this.reaperPeriod = Long.parseLong(System.getProperty(REAPER_PERIOD_PROP_NAME));
-      }
-      else
-      {
-         this.reaperPeriod = reaperPeriod;
-      }
    }
 
    public SimpleString getName()
@@ -126,6 +76,42 @@ public final class GroupingHandlerConfiguration implements Serializable
       return reaperPeriod;
    }
 
+   public GroupingHandlerConfiguration setName(SimpleString name)
+   {
+      this.name = name;
+      return this;
+   }
+
+   public GroupingHandlerConfiguration setType(TYPE type)
+   {
+      this.type = type;
+      return this;
+   }
+
+   public GroupingHandlerConfiguration setAddress(SimpleString address)
+   {
+      this.address = address;
+      return this;
+   }
+
+   public GroupingHandlerConfiguration setTimeout(long timeout)
+   {
+      this.timeout = timeout;
+      return this;
+   }
+
+   public GroupingHandlerConfiguration setGroupTimeout(long groupTimeout)
+   {
+      this.groupTimeout = groupTimeout;
+      return this;
+   }
+
+   public GroupingHandlerConfiguration setReaperPeriod(long reaperPeriod)
+   {
+      this.reaperPeriod = reaperPeriod;
+      return this;
+   }
+
    public enum TYPE
    {
       LOCAL("LOCAL"), REMOTE("REMOTE");

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
b/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
index a817f70..7255c9f 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
@@ -25,8 +25,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.api.core.management.ManagementHelper;
-import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.BindingType;
@@ -140,7 +140,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract
             {
                addRecord = true;
                groupBinding = new GroupBinding(proposal.getGroupId(), proposal.getClusterName());
-               groupBinding.setId(storageManager.generateUniqueID());
+               groupBinding.setId(storageManager.generateID());
                List<GroupBinding> newList = new ArrayList<GroupBinding>();
                List<GroupBinding> oldList = groupMap.putIfAbsent(groupBinding.getClusterName(),
newList);
                if (oldList != null)
@@ -193,7 +193,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract
       props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
       props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address);
       props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
-      Notification notification = new Notification(null, NotificationType.PROPOSAL_RESPONSE,
props);
+      Notification notification = new Notification(null, CoreNotificationType.PROPOSAL_RESPONSE,
props);
       managementService.sendNotification(notification);
    }
 
@@ -243,7 +243,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract
          groupBindings.remove(groupBinding);
          try
          {
-            long tx = storageManager.generateUniqueID();
+            long tx = storageManager.generateID();
             storageManager.deleteGrouping(tx, groupBinding);
             storageManager.commitBindings(tx);
          }
@@ -265,7 +265,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract
          {
             waitingForBindings = true;
 
-            //make a copy of the bindings added so far from the cluster via onNotification().
+            //make a copy of the bindings added so far from the cluster via onNotification()
             List<SimpleString> bindingsAlreadyAdded;
             if (expectedBindings == null)
             {
@@ -307,13 +307,15 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract
 
    public void onNotification(final Notification notification)
    {
-      if (notification.getType() == NotificationType.BINDING_REMOVED)
+      if (!(notification.getType() instanceof CoreNotificationType)) return;
+
+      if (notification.getType() == CoreNotificationType.BINDING_REMOVED)
       {
          SimpleString clusterName = notification.getProperties()
             .getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
          removeGrouping(clusterName);
       }
-      else if (notification.getType() == NotificationType.BINDING_ADDED)
+      else if (notification.getType() == CoreNotificationType.BINDING_ADDED)
       {
          SimpleString clusterName = notification.getProperties()
             .getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
@@ -426,7 +428,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract
                      {
                         if (txID < 0)
                         {
-                           txID = storageManager.generateUniqueID();
+                           txID = storageManager.generateID();
                         }
                         storageManager.deleteGrouping(txID, val);
                      }
@@ -496,7 +498,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract
                   {
                      if (txID < 0)
                      {
-                        txID = storageManager.generateUniqueID();
+                        txID = storageManager.generateID();
                      }
                      storageManager.deleteGrouping(txID, groupBinding);
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
b/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
index 1187c33..ddbef27 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
@@ -23,8 +23,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.api.core.management.ManagementHelper;
-import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.postoffice.BindingType;
 import org.hornetq.core.server.HornetQMessageBundle;
 import org.hornetq.core.server.HornetQServerLogger;
@@ -229,7 +229,7 @@ public final class RemoteGroupingHandler extends GroupHandlingAbstract
 
       props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
 
-      return new Notification(null, NotificationType.PROPOSAL, props);
+      return new Notification(null, CoreNotificationType.PROPOSAL, props);
    }
 
    public Response getProposal(final SimpleString fullID, boolean touchTime)
@@ -294,7 +294,7 @@ public final class RemoteGroupingHandler extends GroupHandlingAbstract
       props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX);
       props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address);
       props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
-      Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
+      Notification notification = new Notification(null, CoreNotificationType.PROPOSAL, props);
       managementService.sendNotification(notification);
       return null;
    }
@@ -311,8 +311,9 @@ public final class RemoteGroupingHandler extends GroupHandlingAbstract
 
    public void onNotification(final Notification notification)
    {
+      if (!(notification.getType() instanceof CoreNotificationType)) return;
       // removing the groupid if the binding has been removed
-      if (notification.getType() == NotificationType.BINDING_REMOVED)
+      if (notification.getType() == CoreNotificationType.BINDING_REMOVED)
       {
          SimpleString clusterName = notification.getProperties()
             .getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/Activation.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/Activation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/Activation.java
new file mode 100644
index 0000000..dc39d7a
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/Activation.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.impl;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.ChannelHandler;
+import org.hornetq.core.remoting.server.RemotingService;
+import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.cluster.ha.HAManager;
+import org.hornetq.core.server.cluster.ha.StandaloneHAManager;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.spi.core.remoting.Acceptor;
+
+/**
+* An activation controls the lifecycle of the server and any components specific to the Activation
itself.
+*/
+public abstract class Activation implements Runnable
+{
+   public abstract void close(boolean permanently, boolean restarting) throws Exception;
+
+   /*
+   * freeze the connection but allow the Activation to over ride this and decide if any connections
should be left open.
+   * */
+   public void freezeConnections(RemotingService remotingService)
+   {
+      if (remotingService != null)
+      {
+         remotingService.freeze(null, null);
+      }
+   }
+
+   /*
+   * allow the activation t ooverride this if it needs to tidy up after freezing the connection.
its a different method as
+   * its called outside of the lock that the previous method is.
+   * */
+   public void postConnectionFreeze()
+   {
+   }
+
+   /*
+   * called before the server is closing the journals so the activation can tidy up stuff
+   * */
+   public void preStorageClose() throws Exception
+   {
+   }
+
+   /*
+   * called by the server to notify the Activation that the server is stopping
+   * */
+   public void sendLiveIsStopping()
+   {
+   }
+
+   /*
+   * called by the ha manager to notify the Activation that HA is now active
+   * */
+   public void haStarted()
+   {
+   }
+
+   /*
+   * allows the Activation to register a channel handler so it can handle any packets that
are unique to the Activation
+   * */
+   public ChannelHandler getActivationChannelHandler(Channel channel, Acceptor acceptorUsed)
+   {
+      return null;
+   }
+
+   /*
+   * returns the HA manager used for this Activation
+   * */
+   public HAManager getHAManager()
+   {
+      return new StandaloneHAManager();
+   }
+
+   /*
+   * create the Journal loader needed for this Activation.
+   * */
+   public JournalLoader createJournalLoader(PostOffice postOffice, PagingManager pagingManager,
StorageManager storageManager, QueueFactory queueFactory, NodeManager nodeManager, ManagementService
managementService, GroupingHandler groupingHandler, Configuration configuration, HornetQServer
parentServer) throws HornetQException
+   {
+      return new PostOfficeJournalLoader(postOffice,
+            pagingManager,
+            storageManager,
+            queueFactory,
+            nodeManager,
+            managementService,
+            groupingHandler,
+            configuration);
+   }
+
+   /*
+   * todo, remove this, its only needed for JMSServerManagerImpl, it should be sought elsewhere
+   * */
+   public ReplicationManager getReplicationManager()
+   {
+      return null;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/BackupRecoveryJournalLoader.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/BackupRecoveryJournalLoader.java
b/hornetq-server/src/main/java/org/hornetq/core/server/impl/BackupRecoveryJournalLoader.java
index 24a4816..021b1dc 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/BackupRecoveryJournalLoader.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/BackupRecoveryJournalLoader.java
@@ -16,7 +16,6 @@ import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.journal.Journal;
@@ -24,12 +23,12 @@ import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.protocol.ServerPacketDecoder;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServerLogger;
 import org.hornetq.core.server.NodeManager;
 import org.hornetq.core.server.QueueFactory;
 import org.hornetq.core.server.cluster.ClusterController;
+import org.hornetq.core.server.cluster.HornetQServerSideProtocolManagerFactory;
 import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.server.management.ManagementService;
 import org.hornetq.core.transaction.ResourceManager;
@@ -88,7 +87,8 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader
    public void postLoad(Journal messageJournal, ResourceManager resourceManager, Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
    {
       ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice,
nodeManager, clusterController);
-      ((ServerLocatorImpl)locator).setPacketDecoder(ServerPacketDecoder.INSTANCE);
+      locator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance());
+
       try (ClientSessionFactory sessionFactory = locator.createSessionFactory())
       {
          scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(),
parentServer.getNodeID());

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/ColocatedActivation.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ColocatedActivation.java
b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ColocatedActivation.java
new file mode 100644
index 0000000..92412a3
--- /dev/null
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ColocatedActivation.java
@@ -0,0 +1,331 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.impl;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.client.impl.Topology;
+import org.hornetq.core.client.impl.TopologyMemberImpl;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.ChannelHandler;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRequestMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupResponseMessage;
+import org.hornetq.core.remoting.server.RemotingService;
+import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.core.server.cluster.ha.ColocatedHAManager;
+import org.hornetq.core.server.cluster.ha.ColocatedPolicy;
+import org.hornetq.core.server.cluster.ha.HAManager;
+import org.hornetq.core.server.cluster.qourum.QuorumVote;
+import org.hornetq.core.server.cluster.qourum.QuorumVoteHandler;
+import org.hornetq.core.server.cluster.qourum.Vote;
+import org.hornetq.spi.core.remoting.Acceptor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class ColocatedActivation extends LiveActivation
+{
+   private static final SimpleString REQUEST_BACKUP_QUORUM_VOTE = new SimpleString("RequestBackupQuorumVote");
+
+   private final ColocatedHAManager colocatedHAManager;
+
+   private final ColocatedPolicy colocatedPolicy;
+
+   LiveActivation liveActivation;
+
+   private final HornetQServerImpl server;
+
+   public ColocatedActivation(HornetQServerImpl hornetQServer, ColocatedPolicy colocatedPolicy,
LiveActivation liveActivation)
+   {
+      server = hornetQServer;
+      this.colocatedPolicy = colocatedPolicy;
+      this.liveActivation = liveActivation;
+      colocatedHAManager = new ColocatedHAManager(colocatedPolicy, server);
+   }
+
+
+   @Override
+   public void haStarted()
+   {
+      server.getClusterManager().getQuorumManager().registerQuorumHandler(new RequestBackupQuorumVoteHandler());
+      //vote for a backup if required
+      if (colocatedPolicy.isRequestBackup())
+      {
+         server.getClusterManager().getQuorumManager().vote(new RequestBackupQuorumVote());
+      }
+   }
+
+   @Override
+   public void freezeConnections(RemotingService remotingService)
+   {
+      liveActivation.freezeConnections(remotingService);
+   }
+
+   @Override
+   public void postConnectionFreeze()
+   {
+      liveActivation.postConnectionFreeze();
+   }
+
+   @Override
+   public void preStorageClose() throws Exception
+   {
+      liveActivation.preStorageClose();
+   }
+
+   @Override
+   public void sendLiveIsStopping()
+   {
+      liveActivation.sendLiveIsStopping();
+   }
+
+   @Override
+   public ReplicationManager getReplicationManager()
+   {
+      return liveActivation.getReplicationManager();
+   }
+
+   @Override
+   public HAManager getHAManager()
+   {
+      return colocatedHAManager;
+   }
+
+   @Override
+   public void run()
+   {
+      liveActivation.run();
+   }
+
+   @Override
+   public void close(boolean permanently, boolean restarting) throws Exception
+   {
+      liveActivation.close(permanently, restarting);
+   }
+
+   @Override
+   public ChannelHandler getActivationChannelHandler(final Channel channel, final Acceptor
acceptorUsed)
+   {
+      final ChannelHandler activationChannelHandler = liveActivation.getActivationChannelHandler(channel,
acceptorUsed);
+      return new ChannelHandler()
+      {
+         @Override
+         public void handlePacket(Packet packet)
+         {
+            if (packet.getType() == PacketImpl.BACKUP_REQUEST)
+            {
+               BackupRequestMessage backupRequestMessage = (BackupRequestMessage) packet;
+               boolean started = false;
+               try
+               {
+                  started = colocatedHAManager.activateBackup(backupRequestMessage.getBackupSize(),
+                        backupRequestMessage.getJournalDirectory(),
+                        backupRequestMessage.getBindingsDirectory(),
+                        backupRequestMessage.getLargeMessagesDirectory(),
+                        backupRequestMessage.getPagingDirectory(),
+                        backupRequestMessage.getNodeID());
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace();
+               }
+               channel.send(new BackupResponseMessage(started));
+            }
+            else if (activationChannelHandler != null)
+            {
+               activationChannelHandler.handlePacket(packet);
+            }
+         }
+      };
+   }
+
+   /**
+    * A vote handler for incoming backup request votes
+    */
+   private final class RequestBackupQuorumVoteHandler implements QuorumVoteHandler
+   {
+      @Override
+      public Vote vote(Vote vote)
+      {
+         int size = colocatedHAManager.getBackupServers().size();
+         return new RequestBackupVote(size, server.getNodeID().toString(), size < colocatedPolicy.getMaxBackups());
+      }
+
+      @Override
+      public SimpleString getQuorumName()
+      {
+         return REQUEST_BACKUP_QUORUM_VOTE;
+      }
+
+      @Override
+      public Vote decode(HornetQBuffer voteBuffer)
+      {
+         RequestBackupVote requestBackupVote = new RequestBackupVote();
+         requestBackupVote.decode(voteBuffer);
+         return requestBackupVote;
+      }
+   }
+   /**
+    * a quorum vote for backup requests
+    */
+   private final class RequestBackupQuorumVote extends QuorumVote<RequestBackupVote, Pair<String,
Integer>>
+   {
+      //the available nodes that we can request
+      private final List<Pair<String, Integer>> nodes = new ArrayList<>();
+
+      public RequestBackupQuorumVote()
+      {
+         super(REQUEST_BACKUP_QUORUM_VOTE);
+      }
+
+      @Override
+      public Vote connected()
+      {
+         return new RequestBackupVote();
+      }
+
+      @Override
+      public Vote notConnected()
+      {
+         return new RequestBackupVote();
+      }
+
+      @Override
+      public void vote(RequestBackupVote vote)
+      {
+         //if the returned vote is available add it to the nodes we can request
+         if (vote.backupAvailable)
+         {
+            nodes.add(vote.getVote());
+         }
+      }
+
+      @Override
+      public Pair<String, Integer> getDecision()
+      {
+         //sort the nodes by how many backups they have and choose the first
+         Collections.sort(nodes, new Comparator<Pair<String, Integer>>()
+         {
+            @Override
+            public int compare(Pair<String, Integer> o1, Pair<String, Integer>
o2)
+            {
+               return o1.getB().compareTo(o2.getB());
+            }
+         });
+         return nodes.get(0);
+      }
+
+      @Override
+      public void allVotesCast(Topology voteTopology)
+      {
+         //if we have any nodes that we can request then send a request
+         if (nodes.size() > 0)
+         {
+            Pair<String, Integer> decision = getDecision();
+            TopologyMemberImpl member = voteTopology.getMember(decision.getA());
+            try
+            {
+               boolean backupStarted = colocatedHAManager.requestBackup(member.getConnector(),
decision.getB().intValue(), !colocatedPolicy.isSharedStore());
+               if (!backupStarted)
+               {
+                  nodes.clear();
+                  server.getScheduledPool().schedule(new Runnable()
+                  {
+                     @Override
+                     public void run()
+                     {
+                        server.getClusterManager().getQuorumManager().vote(new RequestBackupQuorumVote());
+                     }
+                  }, colocatedPolicy.getBackupRequestRetryInterval(), TimeUnit.MILLISECONDS);
+               }
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+               //todo
+            }
+         }
+         else
+         {
+            nodes.clear();
+            server.getScheduledPool().schedule(new Runnable()
+            {
+               @Override
+               public void run()
+               {
+                  server.getClusterManager().getQuorumManager().vote(RequestBackupQuorumVote.this);
+               }
+            }, colocatedPolicy.getBackupRequestRetryInterval(), TimeUnit.MILLISECONDS);
+         }
+      }
+
+      @Override
+      public SimpleString getName()
+      {
+         return REQUEST_BACKUP_QUORUM_VOTE;
+      }
+   }
+
+   class RequestBackupVote extends Vote<Pair<String, Integer>>
+   {
+      private int backupsSize;
+      private String nodeID;
+      private boolean backupAvailable;
+
+      public RequestBackupVote()
+      {
+         backupsSize = -1;
+      }
+
+      public RequestBackupVote(int backupsSize, String nodeID, boolean backupAvailable)
+      {
+         this.backupsSize = backupsSize;
+         this.nodeID = nodeID;
+         this.backupAvailable = backupAvailable;
+      }
+
+      @Override
+      public void encode(HornetQBuffer buff)
+      {
+         buff.writeInt(backupsSize);
+         buff.writeNullableString(nodeID);
+         buff.writeBoolean(backupAvailable);
+      }
+
+      @Override
+      public void decode(HornetQBuffer buff)
+      {
+         backupsSize = buff.readInt();
+         nodeID = buff.readNullableString();
+         backupAvailable = buff.readBoolean();
+      }
+
+      @Override
+      public boolean isRequestServerVote()
+      {
+         return true;
+      }
+
+      @Override
+      public Pair<String, Integer> getVote()
+      {
+         return new Pair<>(nodeID, backupsSize);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/ConnectorsService.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ConnectorsService.java
b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ConnectorsService.java
index 094efc9..2cc6f2d 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ConnectorsService.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ConnectorsService.java
@@ -12,11 +12,13 @@
  */
 package org.hornetq.core.server.impl;
 
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.hornetq.api.core.Pair;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.ConnectorServiceConfiguration;
 import org.hornetq.core.persistence.StorageManager;
@@ -51,48 +53,36 @@ public final class ConnectorsService implements HornetQComponent
 
    private final Set<ConnectorService> connectors = new HashSet<ConnectorService>();
 
+   private final ServiceRegistry serviceRegistry;
+
    public ConnectorsService(final Configuration configuration,
                             final StorageManager storageManager,
                             final ScheduledExecutorService scheduledPool,
-                            final PostOffice postOffice)
+                            final PostOffice postOffice,
+                            final ServiceRegistry serviceRegistry)
    {
       this.configuration = configuration;
       this.storageManager = storageManager;
       this.scheduledPool = scheduledPool;
       this.postOffice = postOffice;
+      this.serviceRegistry = serviceRegistry;
    }
 
    public void start() throws Exception
    {
       List<ConnectorServiceConfiguration> configurationList = configuration.getConnectorServiceConfigurations();
 
-      for (ConnectorServiceConfiguration info : configurationList)
-      {
-         ConnectorServiceFactory factory = (ConnectorServiceFactory)ClassloadingUtil.newInstanceFromClassLoader(info.getFactoryClassName());
-
-         if (info.getParams() != null)
-         {
-            Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(),
info.getParams()
-               .keySet());
-
-            if (!invalid.isEmpty())
-            {
-               HornetQServerLogger.LOGGER.connectorKeysInvalid(ConfigurationHelper.stringSetToCommaListString(invalid));
-
-               continue;
-            }
-         }
-         Set<String> invalid = ConfigurationHelper.checkKeysExist(factory.getRequiredProperties(),
info.getParams()
-            .keySet());
+      Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>>
connectorServiceFactories = serviceRegistry.getConnectorServices();
 
-         if (!invalid.isEmpty())
-         {
-            HornetQServerLogger.LOGGER.connectorKeysMissing(ConfigurationHelper.stringSetToCommaListString(invalid));
+      for (Pair<ConnectorServiceFactory, ConnectorServiceConfiguration> pair : connectorServiceFactories)
+      {
+         createService(pair.getB(), pair.getA());
+      }
 
-            continue;
-         }
-         ConnectorService connectorService = factory.createConnectorService(info.getConnectorName(),
info.getParams(), storageManager, postOffice, scheduledPool);
-         connectors.add(connectorService);
+      for (ConnectorServiceConfiguration info : configurationList)
+      {
+         ConnectorServiceFactory factory = (ConnectorServiceFactory) ClassloadingUtil.newInstanceFromClassLoader(info.getFactoryClassName());
+         createService(info, factory);
       }
 
       for (ConnectorService connector : connectors)
@@ -109,6 +99,28 @@ public final class ConnectorsService implements HornetQComponent
       isStarted = true;
    }
 
+   public void createService(ConnectorServiceConfiguration info, ConnectorServiceFactory
factory)
+   {
+      if (info.getParams() != null)
+      {
+         Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(),
info.getParams().keySet());
+         if (!invalid.isEmpty())
+         {
+            HornetQServerLogger.LOGGER.connectorKeysInvalid(ConfigurationHelper.stringSetToCommaListString(invalid));
+            return;
+         }
+      }
+
+      Set<String> invalid = ConfigurationHelper.checkKeysExist(factory.getRequiredProperties(),
info.getParams().keySet());
+      if (!invalid.isEmpty())
+      {
+         HornetQServerLogger.LOGGER.connectorKeysMissing(ConfigurationHelper.stringSetToCommaListString(invalid));
+         return;
+      }
+      ConnectorService connectorService = factory.createConnectorService(info.getConnectorName(),
info.getParams(), storageManager, postOffice, scheduledPool);
+      connectors.add(connectorService);
+   }
+
    public void stop() throws Exception
    {
       if (!isStarted)

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/DivertImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/DivertImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/DivertImpl.java
index 627f1c1..4e63d14 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/DivertImpl.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/DivertImpl.java
@@ -85,7 +85,7 @@ public class DivertImpl implements Divert
          HornetQServerLogger.LOGGER.trace("Diverting message " + message + " into " + this);
       }
 
-      long id = storageManager.generateUniqueID();
+      long id = storageManager.generateID();
 
       ServerMessage copy = null;
 
@@ -100,6 +100,8 @@ public class DivertImpl implements Divert
 
          copy.setAddress(forwardAddress);
 
+         copy.setExpiration(message.getExpiration());
+
          if (transformer != null)
          {
             copy = transformer.transform(copy);


Mime
View raw message