incubator-cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kis...@apache.org
Subject [41/48] git commit: refs/heads/regions - Merging events framework branch into master. This commit will bring following changes
Date Fri, 01 Feb 2013 06:11:33 GMT
Merging events framework branch into master. This commit will bring
following changes

   - introduced notion of event bus with publish, subscribe, unsubscribe
     semantics

   - a plug-in can implement the EventBus abstraction to provide event
     bug to CloudStack

   - A rabbitMQ based plug-in that can interact with AMQP servers to
     provide message broker based event-bug

   - stream lines, action events, usage events, alerts publishing in to
     convineance classed which are also used to publish corresponding
     event on to event bus

   - introduced notion of state change event. On a state change, in the
     state machine corrsponding to the resource, a state change event is
     published on the event bug

   - associated a state machined with Snapshot and Network objects

   - Virtual Machine, Volume, Snaphost, Network object state changes wil
     result in a state change event


Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/e7a554fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/e7a554fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/e7a554fc

Branch: refs/heads/regions
Commit: e7a554fc6a23a49949c2d88d6ef680682c6f6bc4
Parents: 6a6d93c
Author: Murali Reddy <murali.reddy@citrix.com>
Authored: Fri Feb 1 01:30:49 2013 +0530
Committer: Murali Reddy <murali.reddy@citrix.com>
Committed: Fri Feb 1 01:37:24 2013 +0530

----------------------------------------------------------------------
 api/src/com/cloud/event/EventCategory.java         |   55 ++
 api/src/com/cloud/event/EventTypes.java            |  351 +++++++++-
 api/src/com/cloud/network/Network.java             |   56 +-
 api/src/com/cloud/storage/Snapshot.java            |   39 +-
 .../cloudstack/api/response/SnapshotResponse.java  |   12 +-
 client/pom.xml                                     |    5 +
 core/src/com/cloud/storage/SnapshotVO.java         |   26 +-
 framework/events/pom.xml                           |   47 ++
 .../apache/cloudstack/framework/events/Event.java  |   94 +++
 .../cloudstack/framework/events/EventBus.java      |   55 ++
 .../framework/events/EventBusException.java        |   26 +
 .../framework/events/EventSubscriber.java          |   30 +
 .../cloudstack/framework/events/EventTopic.java    |   57 ++
 framework/pom.xml                                  |   35 +
 plugins/event-bus/rabbitmq/pom.xml                 |   46 ++
 .../cloudstack/mom/rabbitmq/RabbitMQEventBus.java  |  555 +++++++++++++++
 .../cloud/network/guru/OvsGuestNetworkGuru.java    |    4 +-
 plugins/pom.xml                                    |    1 +
 pom.xml                                            |    1 +
 server/pom.xml                                     |    5 +
 server/src/com/cloud/alert/AlertManagerImpl.java   |   64 ++
 server/src/com/cloud/api/ApiDBUtils.java           |  207 +-----
 server/src/com/cloud/api/ApiResponseHelper.java    |  191 +-----
 server/src/com/cloud/api/ApiServer.java            |    5 +-
 .../cloud/baremetal/BareMetalTemplateAdapter.java  |   30 +-
 .../cloud/baremetal/BareMetalVmManagerImpl.java    |   63 +--
 .../configuration/DefaultInterceptorLibrary.java   |    8 +-
 .../src/com/cloud/event/ActionEventCallback.java   |  135 ----
 server/src/com/cloud/event/ActionEventUtils.java   |  288 ++++++++
 server/src/com/cloud/event/AlertGenerator.java     |   87 +++
 server/src/com/cloud/event/EventUtils.java         |  102 ---
 server/src/com/cloud/event/UsageEventUtils.java    |  119 +++
 .../src/com/cloud/network/NetworkManagerImpl.java  |  204 +++----
 .../src/com/cloud/network/NetworkServiceImpl.java  |   90 +--
 .../com/cloud/network/NetworkStateListener.java    |   90 +++
 server/src/com/cloud/network/NetworkVO.java        |    1 +
 server/src/com/cloud/network/dao/NetworkDao.java   |    4 +-
 .../src/com/cloud/network/dao/NetworkDaoImpl.java  |   32 +-
 .../network/firewall/FirewallManagerImpl.java      |   27 +-
 .../network/guru/ExternalGuestNetworkGuru.java     |    7 +-
 .../com/cloud/network/guru/GuestNetworkGuru.java   |   11 +-
 .../network/lb/LoadBalancingRulesManagerImpl.java  |   11 +-
 .../com/cloud/network/rules/RulesManagerImpl.java  |   12 +-
 .../network/security/SecurityGroupManagerImpl.java |   78 +--
 .../network/vpn/RemoteAccessVpnManagerImpl.java    |   61 +-
 .../src/com/cloud/server/ManagementServerImpl.java |    9 +-
 .../src/com/cloud/storage/StorageManagerImpl.java  |  187 ++----
 server/src/com/cloud/storage/dao/SnapshotDao.java  |   13 +-
 .../src/com/cloud/storage/dao/SnapshotDaoImpl.java |   49 +-
 .../storage/download/DownloadMonitorImpl.java      |   76 +--
 .../storage/listener/SnapshotStateListener.java    |   85 +++
 .../storage/listener/VolumeStateListener.java      |   85 +++
 .../storage/snapshot/SnapshotManagerImpl.java      |  195 +++---
 .../storage/snapshot/SnapshotSchedulerImpl.java    |    6 +-
 .../cloud/template/HyervisorTemplateAdapter.java   |   29 +-
 .../com/cloud/template/TemplateManagerImpl.java    |   95 +--
 server/src/com/cloud/user/AccountManagerImpl.java  |    8 +-
 server/src/com/cloud/vm/UserVmManagerImpl.java     |   59 +-
 server/src/com/cloud/vm/UserVmStateListener.java   |   88 ++-
 .../test/com/cloud/snapshot/SnapshotDaoTest.java   |    9 +-
 .../test/com/cloud/vpc/dao/MockNetworkDaoImpl.java |   17 +-
 tools/whisker/LICENSE                              |  500 +++++++++++++-
 tools/whisker/descriptor-for-packaging.xml         |   18 +
 63 files changed, 3400 insertions(+), 1555 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/api/src/com/cloud/event/EventCategory.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/event/EventCategory.java b/api/src/com/cloud/event/EventCategory.java
new file mode 100644
index 0000000..cee6529
--- /dev/null
+++ b/api/src/com/cloud/event/EventCategory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.cloud.event;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class EventCategory {
+    private static List<EventCategory> eventCategories = new ArrayList<EventCategory>();
+    private String eventCategoryName;
+
+    public  EventCategory(String categoryName) {
+        this.eventCategoryName = categoryName;
+        eventCategories.add(this);
+    }
+
+    public String getName() {
+        return eventCategoryName;
+    }
+
+    public static List<EventCategory> listAllEventCategories() {
+        return eventCategories;
+    }
+
+    public static EventCategory getEventCategory(String categoryName) {
+        for (EventCategory category : eventCategories) {
+            if (category.getName().equalsIgnoreCase(categoryName)) {
+                return category;
+            }
+        }
+        return null;
+    }
+
+    public static final EventCategory ACTION_EVENT = new EventCategory("ActionEvent");
+    public static final EventCategory ALERT_EVENT  = new EventCategory("AlertEvent");
+    public static final EventCategory USAGE_EVENT  = new EventCategory("UsageEvent");
+    public static final EventCategory RESOURCE_STATE_CHANGE_EVENT = new EventCategory("ResourceStateEvent");
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/api/src/com/cloud/event/EventTypes.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/event/EventTypes.java b/api/src/com/cloud/event/EventTypes.java
index d666c1e..0dd97cb 100755
--- a/api/src/com/cloud/event/EventTypes.java
+++ b/api/src/com/cloud/event/EventTypes.java
@@ -16,7 +16,41 @@
 // under the License.
 package com.cloud.event;
 
+import com.cloud.configuration.Configuration;
+import com.cloud.dc.DataCenter;
+import com.cloud.dc.Pod;
+import com.cloud.dc.StorageNetworkIpRange;
+import com.cloud.dc.Vlan;
+import com.cloud.domain.Domain;
+import com.cloud.host.Host;
+import com.cloud.network.*;
+import com.cloud.network.as.*;
+import com.cloud.network.router.VirtualRouter;
+import com.cloud.network.rules.LoadBalancer;
+import com.cloud.network.rules.StaticNat;
+import com.cloud.network.security.SecurityGroup;
+import com.cloud.network.vpc.PrivateGateway;
+import com.cloud.network.vpc.StaticRoute;
+import com.cloud.network.vpc.Vpc;
+import com.cloud.offering.DiskOffering;
+import com.cloud.offering.NetworkOffering;
+import com.cloud.offering.ServiceOffering;
+import com.cloud.projects.Project;
+import com.cloud.storage.Snapshot;
+import com.cloud.storage.Volume;
+import com.cloud.template.VirtualMachineTemplate;
+import com.cloud.user.Account;
+import com.cloud.user.User;
+import com.cloud.vm.VirtualMachine;
+
+import java.util.HashMap;
+import java.util.Map;
+
 public class EventTypes {
+
+    //map of Event and corresponding entity for which Event is applicable
+    private static Map<String, String> entityEventDetails = null;
+
     // VM Events
     public static final String EVENT_VM_CREATE = "VM.CREATE";
     public static final String EVENT_VM_DESTROY = "VM.DESTROY";
@@ -319,10 +353,323 @@ public class EventTypes {
     public static final String EVENT_AUTOSCALEVMGROUP_UPDATE = "AUTOSCALEVMGROUP.UPDATE";
     public static final String EVENT_AUTOSCALEVMGROUP_ENABLE = "AUTOSCALEVMGROUP.ENABLE";
     public static final String EVENT_AUTOSCALEVMGROUP_DISABLE = "AUTOSCALEVMGROUP.DISABLE";
-    
+
+
     public static final String EVENT_BAREMETAL_DHCP_SERVER_ADD = "PHYSICAL.DHCP.ADD";
     public static final String EVENT_BAREMETAL_DHCP_SERVER_DELETE = "PHYSICAL.DHCP.DELETE";
-    
     public static final String EVENT_BAREMETAL_PXE_SERVER_ADD = "PHYSICAL.PXE.ADD";
     public static final String EVENT_BAREMETAL_PXE_SERVER_DELETE = "PHYSICAL.PXE.DELETE";
+
+    static {
+
+        // TODO: need a way to force author adding event types to declare the entity details as well, with out braking
+        // current ActionEvent annotation semantics
+
+        entityEventDetails = new HashMap<String, String>();
+
+        entityEventDetails.put(EVENT_VM_CREATE, VirtualMachine.class.getName());
+        entityEventDetails.put(EVENT_VM_DESTROY, VirtualMachine.class.getName());
+        entityEventDetails.put(EVENT_VM_START, VirtualMachine.class.getName());
+        entityEventDetails.put(EVENT_VM_STOP, VirtualMachine.class.getName());
+        entityEventDetails.put(EVENT_VM_REBOOT, VirtualMachine.class.getName());
+        entityEventDetails.put(EVENT_VM_UPDATE, VirtualMachine.class.getName());
+        entityEventDetails.put(EVENT_VM_UPGRADE, VirtualMachine.class.getName());
+        entityEventDetails.put(EVENT_VM_RESETPASSWORD, VirtualMachine.class.getName());
+        entityEventDetails.put(EVENT_VM_MIGRATE, VirtualMachine.class.getName());
+        entityEventDetails.put(EVENT_VM_MOVE, VirtualMachine.class.getName());
+        entityEventDetails.put(EVENT_VM_RESTORE, VirtualMachine.class.getName());
+
+        entityEventDetails.put(EVENT_ROUTER_CREATE, VirtualRouter.class.getName());
+        entityEventDetails.put(EVENT_ROUTER_DESTROY, VirtualRouter.class.getName());
+        entityEventDetails.put(EVENT_ROUTER_START, VirtualRouter.class.getName());
+        entityEventDetails.put(EVENT_ROUTER_STOP, VirtualRouter.class.getName());
+        entityEventDetails.put(EVENT_ROUTER_REBOOT, VirtualRouter.class.getName());
+        entityEventDetails.put(EVENT_ROUTER_HA, VirtualRouter.class.getName());
+        entityEventDetails.put(EVENT_ROUTER_UPGRADE, VirtualRouter.class.getName());
+
+        entityEventDetails.put(EVENT_PROXY_CREATE, "ConsoleProxy");
+        entityEventDetails.put(EVENT_PROXY_DESTROY, "ConsoleProxy");
+        entityEventDetails.put(EVENT_PROXY_START, "ConsoleProxy");
+        entityEventDetails.put(EVENT_PROXY_STOP, "ConsoleProxy");
+        entityEventDetails.put(EVENT_PROXY_REBOOT, "ConsoleProxy");
+        entityEventDetails.put(EVENT_ROUTER_HA, "ConsoleProxy");
+        entityEventDetails.put(EVENT_PROXY_HA, "ConsoleProxy");
+
+        entityEventDetails.put(EVENT_VNC_CONNECT, "VNC");
+        entityEventDetails.put(EVENT_VNC_DISCONNECT, "VNC");
+
+        // Network Events
+        entityEventDetails.put(EVENT_NETWORK_CREATE, Network.class.getName());
+        entityEventDetails.put(EVENT_NETWORK_DELETE, Network.class.getName());
+        entityEventDetails.put(EVENT_NETWORK_UPDATE, Network.class.getName());
+        entityEventDetails.put(EVENT_NETWORK_RESTART, Network.class.getName());
+        entityEventDetails.put(EVENT_NET_IP_ASSIGN, PublicIpAddress.class.getName());
+        entityEventDetails.put(EVENT_NET_IP_RELEASE, PublicIpAddress.class.getName());
+        entityEventDetails.put(EVENT_NET_RULE_ADD, Network.class.getName());
+        entityEventDetails.put(EVENT_NET_RULE_DELETE, Network.class.getName());
+        entityEventDetails.put(EVENT_NET_RULE_MODIFY, Network.class.getName());
+        entityEventDetails.put(EVENT_FIREWALL_OPEN, Network.class.getName());
+        entityEventDetails.put(EVENT_FIREWALL_CLOSE, Network.class.getName());
+
+        // Load Balancers
+        entityEventDetails.put(EVENT_ASSIGN_TO_LOAD_BALANCER_RULE, LoadBalancer.class.getName());
+        entityEventDetails.put(EVENT_REMOVE_FROM_LOAD_BALANCER_RULE, LoadBalancer.class.getName());
+        entityEventDetails.put(EVENT_LOAD_BALANCER_CREATE, LoadBalancer.class.getName());
+        entityEventDetails.put(EVENT_LOAD_BALANCER_DELETE, LoadBalancer.class.getName());
+        entityEventDetails.put(EVENT_LB_STICKINESSPOLICY_CREATE, LoadBalancer.class.getName());
+        entityEventDetails.put(EVENT_LB_STICKINESSPOLICY_DELETE, LoadBalancer.class.getName());
+        entityEventDetails.put(EVENT_LOAD_BALANCER_UPDATE, LoadBalancer.class.getName());
+
+        // Account events
+        entityEventDetails.put(EVENT_ACCOUNT_DISABLE, Account.class.getName());
+        entityEventDetails.put(EVENT_ACCOUNT_CREATE, Account.class.getName());
+        entityEventDetails.put(EVENT_ACCOUNT_DELETE, Account.class.getName());
+        entityEventDetails.put(EVENT_ACCOUNT_MARK_DEFAULT_ZONE, Account.class.getName());
+
+        // UserVO Events
+        entityEventDetails.put(EVENT_USER_LOGIN, User.class.getName());
+        entityEventDetails.put(EVENT_USER_LOGOUT, User.class.getName());
+        entityEventDetails.put(EVENT_USER_CREATE, User.class.getName());
+        entityEventDetails.put(EVENT_USER_DELETE, User.class.getName());
+        entityEventDetails.put(EVENT_USER_DISABLE, User.class.getName());
+        entityEventDetails.put(EVENT_USER_UPDATE, User.class.getName());
+        entityEventDetails.put(EVENT_USER_ENABLE, User.class.getName());
+        entityEventDetails.put(EVENT_USER_LOCK, User.class.getName());
+
+        // Template Events
+        entityEventDetails.put(EVENT_TEMPLATE_CREATE, VirtualMachineTemplate.class.getName());
+        entityEventDetails.put(EVENT_TEMPLATE_DELETE, VirtualMachineTemplate.class.getName());
+        entityEventDetails.put(EVENT_TEMPLATE_UPDATE, VirtualMachineTemplate.class.getName());
+        entityEventDetails.put(EVENT_TEMPLATE_DOWNLOAD_START, VirtualMachineTemplate.class.getName());
+        entityEventDetails.put(EVENT_TEMPLATE_DOWNLOAD_SUCCESS, VirtualMachineTemplate.class.getName());
+        entityEventDetails.put(EVENT_TEMPLATE_DOWNLOAD_FAILED, VirtualMachineTemplate.class.getName());
+        entityEventDetails.put(EVENT_TEMPLATE_COPY, VirtualMachineTemplate.class.getName());
+        entityEventDetails.put(EVENT_TEMPLATE_EXTRACT, VirtualMachineTemplate.class.getName());
+        entityEventDetails.put(EVENT_TEMPLATE_UPLOAD, VirtualMachineTemplate.class.getName());
+        entityEventDetails.put(EVENT_TEMPLATE_CLEANUP, VirtualMachineTemplate.class.getName());
+
+        // Volume Events
+        entityEventDetails.put(EVENT_VOLUME_CREATE, Volume.class.getName());
+        entityEventDetails.put(EVENT_VOLUME_DELETE, Volume.class.getName());
+        entityEventDetails.put(EVENT_VOLUME_ATTACH, Volume.class.getName());
+        entityEventDetails.put(EVENT_VOLUME_DETACH, Volume.class.getName());
+        entityEventDetails.put(EVENT_VOLUME_EXTRACT, Volume.class.getName());
+        entityEventDetails.put(EVENT_VOLUME_UPLOAD, Volume.class.getName());
+        entityEventDetails.put(EVENT_VOLUME_MIGRATE, Volume.class.getName());
+        entityEventDetails.put(EVENT_VOLUME_RESIZE, Volume.class.getName());
+
+        // Domains
+        entityEventDetails.put(EVENT_DOMAIN_CREATE, Domain.class.getName());
+        entityEventDetails.put(EVENT_DOMAIN_DELETE, Domain.class.getName());
+        entityEventDetails.put(EVENT_DOMAIN_UPDATE, Domain.class.getName());
+
+        // Snapshots
+        entityEventDetails.put(EVENT_SNAPSHOT_CREATE, Snapshot.class.getName());
+        entityEventDetails.put(EVENT_SNAPSHOT_DELETE, Snapshot.class.getName());
+        entityEventDetails.put(EVENT_SNAPSHOT_POLICY_CREATE, Snapshot.class.getName());
+        entityEventDetails.put(EVENT_SNAPSHOT_POLICY_UPDATE, Snapshot.class.getName());
+        entityEventDetails.put(EVENT_SNAPSHOT_POLICY_DELETE, Snapshot.class.getName());
+
+        // ISO
+        entityEventDetails.put(EVENT_ISO_CREATE, "Iso");
+        entityEventDetails.put(EVENT_ISO_DELETE, "Iso");
+        entityEventDetails.put(EVENT_ISO_COPY, "Iso");
+        entityEventDetails.put(EVENT_ISO_ATTACH, "Iso");
+        entityEventDetails.put(EVENT_ISO_DETACH, "Iso");
+        entityEventDetails.put(EVENT_ISO_EXTRACT, "Iso");
+        entityEventDetails.put(EVENT_ISO_UPLOAD, "Iso");
+
+        // SSVM
+        entityEventDetails.put(EVENT_SSVM_CREATE, "SecondaryStorageVm");
+        entityEventDetails.put(EVENT_SSVM_DESTROY, "SecondaryStorageVm");
+        entityEventDetails.put(EVENT_SSVM_START, "SecondaryStorageVm");
+        entityEventDetails.put(EVENT_SSVM_STOP, "SecondaryStorageVm");
+        entityEventDetails.put(EVENT_SSVM_REBOOT, "SecondaryStorageVm");
+        entityEventDetails.put(EVENT_SSVM_HA, "SecondaryStorageVm");
+
+        // Service Offerings
+        entityEventDetails.put(EVENT_SERVICE_OFFERING_CREATE, ServiceOffering.class.getName());
+        entityEventDetails.put(EVENT_SERVICE_OFFERING_EDIT, ServiceOffering.class.getName());
+        entityEventDetails.put(EVENT_SERVICE_OFFERING_DELETE, ServiceOffering.class.getName());
+
+        // Disk Offerings
+        entityEventDetails.put(EVENT_DISK_OFFERING_CREATE, DiskOffering.class.getName());
+        entityEventDetails.put(EVENT_DISK_OFFERING_EDIT, DiskOffering.class.getName());
+        entityEventDetails.put(EVENT_DISK_OFFERING_DELETE, DiskOffering.class.getName());
+
+        // Network offerings
+        entityEventDetails.put(EVENT_NETWORK_OFFERING_CREATE, NetworkOffering.class.getName());
+        entityEventDetails.put(EVENT_NETWORK_OFFERING_ASSIGN, NetworkOffering.class.getName());
+        entityEventDetails.put(EVENT_NETWORK_OFFERING_EDIT, NetworkOffering.class.getName());
+        entityEventDetails.put(EVENT_NETWORK_OFFERING_REMOVE, NetworkOffering.class.getName());
+        entityEventDetails.put(EVENT_NETWORK_OFFERING_DELETE, NetworkOffering.class.getName());
+
+        // Pods
+        entityEventDetails.put(EVENT_POD_CREATE, Pod.class.getName());
+        entityEventDetails.put(EVENT_POD_EDIT, Pod.class.getName());
+        entityEventDetails.put(EVENT_POD_DELETE, Pod.class.getName());
+
+        // Zones
+        entityEventDetails.put(EVENT_ZONE_CREATE, DataCenter.class.getName());
+        entityEventDetails.put(EVENT_ZONE_EDIT, DataCenter.class.getName());
+        entityEventDetails.put(EVENT_ZONE_DELETE, DataCenter.class.getName());
+
+        // VLANs/IP ranges
+        entityEventDetails.put(EVENT_VLAN_IP_RANGE_CREATE, Vlan.class.getName());
+        entityEventDetails.put(EVENT_VLAN_IP_RANGE_DELETE,Vlan.class.getName());
+
+        entityEventDetails.put(EVENT_STORAGE_IP_RANGE_CREATE, StorageNetworkIpRange.class.getName());
+        entityEventDetails.put(EVENT_STORAGE_IP_RANGE_DELETE, StorageNetworkIpRange.class.getName());
+        entityEventDetails.put(EVENT_STORAGE_IP_RANGE_UPDATE, StorageNetworkIpRange.class.getName());
+
+        // Configuration Table
+        entityEventDetails.put(EVENT_CONFIGURATION_VALUE_EDIT, Configuration.class.getName());
+
+        // Security Groups
+        entityEventDetails.put(EVENT_SECURITY_GROUP_AUTHORIZE_INGRESS, SecurityGroup.class.getName());
+        entityEventDetails.put(EVENT_SECURITY_GROUP_REVOKE_INGRESS, SecurityGroup.class.getName());
+        entityEventDetails.put(EVENT_SECURITY_GROUP_AUTHORIZE_EGRESS, SecurityGroup.class.getName());
+        entityEventDetails.put(EVENT_SECURITY_GROUP_REVOKE_EGRESS, SecurityGroup.class.getName());
+        entityEventDetails.put(EVENT_SECURITY_GROUP_CREATE, SecurityGroup.class.getName());
+        entityEventDetails.put(EVENT_SECURITY_GROUP_DELETE, SecurityGroup.class.getName());
+        entityEventDetails.put(EVENT_SECURITY_GROUP_ASSIGN, SecurityGroup.class.getName());
+        entityEventDetails.put(EVENT_SECURITY_GROUP_REMOVE, SecurityGroup.class.getName());
+
+        // Host
+        entityEventDetails.put(EVENT_HOST_RECONNECT,  Host.class.getName());
+
+        // Maintenance
+        entityEventDetails.put(EVENT_MAINTENANCE_CANCEL,  Host.class.getName());
+        entityEventDetails.put(EVENT_MAINTENANCE_CANCEL_PRIMARY_STORAGE,  Host.class.getName());
+        entityEventDetails.put(EVENT_MAINTENANCE_PREPARE,  Host.class.getName());
+        entityEventDetails.put(EVENT_MAINTENANCE_PREPARE_PRIMARY_STORAGE,  Host.class.getName());
+
+        // VPN
+        entityEventDetails.put(EVENT_REMOTE_ACCESS_VPN_CREATE, RemoteAccessVpn.class.getName());
+        entityEventDetails.put(EVENT_REMOTE_ACCESS_VPN_DESTROY, RemoteAccessVpn.class.getName());
+        entityEventDetails.put(EVENT_VPN_USER_ADD, RemoteAccessVpn.class.getName());
+        entityEventDetails.put(EVENT_VPN_USER_REMOVE, RemoteAccessVpn.class.getName());
+        entityEventDetails.put(EVENT_S2S_VPN_GATEWAY_CREATE, RemoteAccessVpn.class.getName());
+        entityEventDetails.put(EVENT_S2S_VPN_GATEWAY_DELETE, RemoteAccessVpn.class.getName());
+        entityEventDetails.put(EVENT_S2S_VPN_CUSTOMER_GATEWAY_CREATE, RemoteAccessVpn.class.getName());
+        entityEventDetails.put(EVENT_S2S_VPN_CUSTOMER_GATEWAY_DELETE, RemoteAccessVpn.class.getName());
+        entityEventDetails.put(EVENT_S2S_VPN_CUSTOMER_GATEWAY_UPDATE, RemoteAccessVpn.class.getName());
+        entityEventDetails.put(EVENT_S2S_VPN_CONNECTION_CREATE, RemoteAccessVpn.class.getName());
+        entityEventDetails.put(EVENT_S2S_VPN_CONNECTION_DELETE, RemoteAccessVpn.class.getName());
+        entityEventDetails.put(EVENT_S2S_VPN_CONNECTION_RESET, RemoteAccessVpn.class.getName());
+
+        // Custom certificates
+        entityEventDetails.put(EVENT_UPLOAD_CUSTOM_CERTIFICATE, "Certificate");
+
+        // OneToOnenat
+        entityEventDetails.put(EVENT_ENABLE_STATIC_NAT, StaticNat.class.getName());
+        entityEventDetails.put(EVENT_DISABLE_STATIC_NAT, StaticNat.class.getName());
+
+        entityEventDetails.put(EVENT_ZONE_VLAN_ASSIGN,Vlan.class.getName());
+        entityEventDetails.put(EVENT_ZONE_VLAN_RELEASE,Vlan.class.getName());
+
+        // Projects
+        entityEventDetails.put(EVENT_PROJECT_CREATE, Project.class.getName());
+        entityEventDetails.put(EVENT_PROJECT_UPDATE, Project.class.getName());
+        entityEventDetails.put(EVENT_PROJECT_DELETE, Project.class.getName());
+        entityEventDetails.put(EVENT_PROJECT_ACTIVATE, Project.class.getName());
+        entityEventDetails.put(EVENT_PROJECT_SUSPEND, Project.class.getName());
+        entityEventDetails.put(EVENT_PROJECT_ACCOUNT_ADD, Project.class.getName());
+        entityEventDetails.put(EVENT_PROJECT_INVITATION_UPDATE, Project.class.getName());
+        entityEventDetails.put(EVENT_PROJECT_INVITATION_REMOVE, Project.class.getName());
+        entityEventDetails.put(EVENT_PROJECT_ACCOUNT_REMOVE, Project.class.getName());
+
+        // Network as a Service
+        entityEventDetails.put(EVENT_NETWORK_ELEMENT_CONFIGURE,Network.class.getName());
+
+        // Physical Network Events
+        entityEventDetails.put(EVENT_PHYSICAL_NETWORK_CREATE, PhysicalNetwork.class.getName());
+        entityEventDetails.put(EVENT_PHYSICAL_NETWORK_DELETE, PhysicalNetwork.class.getName());
+        entityEventDetails.put(EVENT_PHYSICAL_NETWORK_UPDATE, PhysicalNetwork.class.getName());
+
+        // Physical Network Service Provider Events
+        entityEventDetails.put(EVENT_SERVICE_PROVIDER_CREATE, PhysicalNetworkServiceProvider.class.getName());
+        entityEventDetails.put(EVENT_SERVICE_PROVIDER_DELETE, PhysicalNetworkServiceProvider.class.getName());
+        entityEventDetails.put(EVENT_SERVICE_PROVIDER_UPDATE, PhysicalNetworkServiceProvider.class.getName());
+
+        // Physical Network TrafficType Events
+        entityEventDetails.put(EVENT_TRAFFIC_TYPE_CREATE, PhysicalNetworkTrafficType.class.getName());
+        entityEventDetails.put(EVENT_TRAFFIC_TYPE_DELETE, PhysicalNetworkTrafficType.class.getName());
+        entityEventDetails.put(EVENT_TRAFFIC_TYPE_UPDATE, PhysicalNetworkTrafficType.class.getName());
+
+        // external network device events
+        entityEventDetails.put(EVENT_EXTERNAL_LB_DEVICE_ADD, PhysicalNetwork.class.getName());
+        entityEventDetails.put(EVENT_EXTERNAL_LB_DEVICE_DELETE, PhysicalNetwork.class.getName());
+        entityEventDetails.put(EVENT_EXTERNAL_LB_DEVICE_CONFIGURE, PhysicalNetwork.class.getName());
+
+        // external switch management device events (E.g.: Cisco Nexus 1000v Virtual Supervisor Module.
+        entityEventDetails.put(EVENT_EXTERNAL_SWITCH_MGMT_DEVICE_ADD, "Nexus1000v");
+        entityEventDetails.put(EVENT_EXTERNAL_SWITCH_MGMT_DEVICE_DELETE, "Nexus1000v");
+        entityEventDetails.put(EVENT_EXTERNAL_SWITCH_MGMT_DEVICE_CONFIGURE, "Nexus1000v");
+        entityEventDetails.put(EVENT_EXTERNAL_SWITCH_MGMT_DEVICE_ENABLE, "Nexus1000v");
+        entityEventDetails.put(EVENT_EXTERNAL_SWITCH_MGMT_DEVICE_DISABLE, "Nexus1000v");
+
+
+        entityEventDetails.put(EVENT_EXTERNAL_FIREWALL_DEVICE_ADD, PhysicalNetwork.class.getName());
+        entityEventDetails.put(EVENT_EXTERNAL_FIREWALL_DEVICE_DELETE, PhysicalNetwork.class.getName());
+        entityEventDetails.put(EVENT_EXTERNAL_FIREWALL_DEVICE_CONFIGURE, PhysicalNetwork.class.getName());
+
+        // VPC
+        entityEventDetails.put(EVENT_VPC_CREATE, Vpc.class.getName());
+        entityEventDetails.put(EVENT_VPC_UPDATE, Vpc.class.getName());
+        entityEventDetails.put(EVENT_VPC_DELETE, Vpc.class.getName());
+        entityEventDetails.put(EVENT_VPC_RESTART, Vpc.class.getName());
+
+        // VPC offerings
+        entityEventDetails.put(EVENT_VPC_OFFERING_CREATE, Vpc.class.getName());
+        entityEventDetails.put(EVENT_VPC_OFFERING_UPDATE, Vpc.class.getName());
+        entityEventDetails.put(EVENT_VPC_OFFERING_DELETE, Vpc.class.getName());
+
+        // Private gateway
+        entityEventDetails.put(EVENT_PRIVATE_GATEWAY_CREATE, PrivateGateway.class.getName());
+        entityEventDetails.put(EVENT_PRIVATE_GATEWAY_DELETE, PrivateGateway.class.getName());
+
+        // Static routes
+        entityEventDetails.put(EVENT_STATIC_ROUTE_CREATE, StaticRoute.class.getName());
+        entityEventDetails.put(EVENT_STATIC_ROUTE_DELETE, StaticRoute.class.getName());
+
+        // tag related events
+        entityEventDetails.put(EVENT_TAGS_CREATE, "Tag");
+        entityEventDetails.put(EVENT_TAGS_DELETE, "tag");
+
+        // external network device events
+        entityEventDetails.put(EVENT_EXTERNAL_NVP_CONTROLLER_ADD,  "NvpController");
+        entityEventDetails.put(EVENT_EXTERNAL_NVP_CONTROLLER_DELETE,  "NvpController");
+        entityEventDetails.put(EVENT_EXTERNAL_NVP_CONTROLLER_CONFIGURE, "NvpController");
+
+        // AutoScale
+        entityEventDetails.put(EVENT_COUNTER_CREATE, AutoScaleCounter.class.getName());
+        entityEventDetails.put(EVENT_COUNTER_DELETE, AutoScaleCounter.class.getName());
+        entityEventDetails.put(EVENT_CONDITION_CREATE, Condition.class.getName());
+        entityEventDetails.put(EVENT_CONDITION_DELETE, Condition.class.getName());
+        entityEventDetails.put(EVENT_AUTOSCALEPOLICY_CREATE, AutoScalePolicy.class.getName());
+        entityEventDetails.put(EVENT_AUTOSCALEPOLICY_UPDATE, AutoScalePolicy.class.getName());
+        entityEventDetails.put(EVENT_AUTOSCALEPOLICY_DELETE, AutoScalePolicy.class.getName());
+        entityEventDetails.put(EVENT_AUTOSCALEVMPROFILE_CREATE, AutoScaleVmProfile.class.getName());
+        entityEventDetails.put(EVENT_AUTOSCALEVMPROFILE_DELETE, AutoScaleVmProfile.class.getName());
+        entityEventDetails.put(EVENT_AUTOSCALEVMPROFILE_UPDATE, AutoScaleVmProfile.class.getName());
+        entityEventDetails.put(EVENT_AUTOSCALEVMGROUP_CREATE, AutoScaleVmGroup.class.getName());
+        entityEventDetails.put(EVENT_AUTOSCALEVMGROUP_DELETE, AutoScaleVmGroup.class.getName());
+        entityEventDetails.put(EVENT_AUTOSCALEVMGROUP_UPDATE, AutoScaleVmGroup.class.getName());
+        entityEventDetails.put(EVENT_AUTOSCALEVMGROUP_ENABLE, AutoScaleVmGroup.class.getName());
+        entityEventDetails.put(EVENT_AUTOSCALEVMGROUP_DISABLE, AutoScaleVmGroup.class.getName());
+    }
+
+    public static String getEntityForEvent (String eventName) {
+        String entityClassName = entityEventDetails.get(eventName);
+        if (entityClassName == null || entityClassName.isEmpty()) {
+            return null;
+        }
+        int index = entityClassName.lastIndexOf(".");
+        String entityName = entityClassName;
+        if (index != -1) {
+            entityName = entityClassName.substring(index+1);
+        }
+        return entityName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/api/src/com/cloud/network/Network.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/network/Network.java b/api/src/com/cloud/network/Network.java
index 413b6d9..1dbb327 100644
--- a/api/src/com/cloud/network/Network.java
+++ b/api/src/com/cloud/network/Network.java
@@ -16,26 +16,25 @@
 // under the License.
 package com.cloud.network;
 
-import org.apache.cloudstack.acl.ControlledEntity;
 import com.cloud.network.Networks.BroadcastDomainType;
 import com.cloud.network.Networks.Mode;
 import com.cloud.network.Networks.TrafficType;
-import com.cloud.utils.fsm.FiniteState;
-import com.cloud.utils.fsm.StateMachine;
+import com.cloud.utils.fsm.StateMachine2;
+import com.cloud.utils.fsm.StateObject;
+import org.apache.cloudstack.acl.ControlledEntity;
 import org.apache.cloudstack.api.Identity;
 import org.apache.cloudstack.api.InternalIdentity;
 
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 
 /**
  * owned by an account.
  */
-public interface Network extends ControlledEntity, InternalIdentity, Identity {
+public interface Network extends ControlledEntity, StateObject<Network.State>, InternalIdentity, Identity {
 
-    public enum GuestType {
+  public enum GuestType {
         Shared,
         Isolated
     }
@@ -204,7 +203,8 @@ public interface Network extends ControlledEntity, InternalIdentity, Identity {
         OperationFailed;
     }
 
-    enum State implements FiniteState<State, Event> {
+    public enum State {
+
         Allocated("Indicates the network configuration is in allocated but not setup"),
         Setup("Indicates the network configuration is setup"),
         Implementing("Indicates the network configuration is being implemented"),
@@ -212,39 +212,8 @@ public interface Network extends ControlledEntity, InternalIdentity, Identity {
         Shutdown("Indicates the network configuration is being destroyed"),
         Destroy("Indicates that the network is destroyed");
 
+        protected static final StateMachine2<State, Network.Event, Network> s_fsm = new StateMachine2<State, Network.Event, Network>();
 
-        @Override
-        public StateMachine<State, Event> getStateMachine() {
-            return s_fsm;
-        }
-
-        @Override
-        public State getNextState(Event event) {
-            return s_fsm.getNextState(this, event);
-        }
-
-        @Override
-        public List<State> getFromStates(Event event) {
-            return s_fsm.getFromStates(this, event);
-        }
-
-        @Override
-        public Set<Event> getPossibleEvents() {
-            return s_fsm.getPossibleEvents(this);
-        }
-
-        String _description;
-
-        @Override
-        public String getDescription() {
-            return _description;
-        }
-
-        private State(String description) {
-            _description = description;
-        }
-
-        private static StateMachine<State, Event> s_fsm = new StateMachine<State, Event>();
         static {
             s_fsm.addTransition(State.Allocated, Event.ImplementNetwork, State.Implementing);
             s_fsm.addTransition(State.Implementing, Event.OperationSucceeded, State.Implemented);
@@ -253,6 +222,15 @@ public interface Network extends ControlledEntity, InternalIdentity, Identity {
             s_fsm.addTransition(State.Shutdown, Event.OperationSucceeded, State.Allocated);
             s_fsm.addTransition(State.Shutdown, Event.OperationFailed, State.Implemented);
         }
+
+        public static StateMachine2<State, Network.Event, Network> getStateMachine() {
+            return s_fsm;
+        }
+
+        String _description;
+        private State(String description) {
+            _description = description;
+        }
     }
 
     String getName();

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/api/src/com/cloud/storage/Snapshot.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/storage/Snapshot.java b/api/src/com/cloud/storage/Snapshot.java
index 99bdee6..2e2965a 100644
--- a/api/src/com/cloud/storage/Snapshot.java
+++ b/api/src/com/cloud/storage/Snapshot.java
@@ -16,14 +16,16 @@
 // under the License.
 package com.cloud.storage;
 
-import java.util.Date;
-
-import org.apache.cloudstack.acl.ControlledEntity;
 import com.cloud.hypervisor.Hypervisor.HypervisorType;
+import com.cloud.utils.fsm.StateMachine2;
+import com.cloud.utils.fsm.StateObject;
+import org.apache.cloudstack.acl.ControlledEntity;
 import org.apache.cloudstack.api.Identity;
 import org.apache.cloudstack.api.InternalIdentity;
 
-public interface Snapshot extends ControlledEntity, Identity, InternalIdentity {
+import java.util.Date;
+
+public interface Snapshot extends ControlledEntity, Identity, InternalIdentity, StateObject<Snapshot.State> {
     public enum Type {
         MANUAL,
         RECURRING,
@@ -51,13 +53,29 @@ public interface Snapshot extends ControlledEntity, Identity, InternalIdentity {
         }
     }
 
-    public enum Status {
+    public enum State {
         Creating,
         CreatedOnPrimary,
         BackingUp,
         BackedUp,
         Error;
 
+        private final static StateMachine2<State, Event, Snapshot> s_fsm = new StateMachine2<State, Event, Snapshot>();
+
+        public static StateMachine2<State, Event, Snapshot> getStateMachine() {
+            return s_fsm;
+        }
+
+        static {
+            s_fsm.addTransition(null, Event.CreateRequested, Creating);
+            s_fsm.addTransition(Creating, Event.OperationSucceeded, CreatedOnPrimary);
+            s_fsm.addTransition(Creating, Event.OperationNotPerformed, BackedUp);
+            s_fsm.addTransition(Creating, Event.OperationFailed, Error);
+            s_fsm.addTransition(CreatedOnPrimary, Event.BackupToSecondary, BackingUp);
+            s_fsm.addTransition(BackingUp, Event.OperationSucceeded, BackedUp);
+            s_fsm.addTransition(BackingUp, Event.OperationFailed, Error);
+        }
+
         public String toString() {
             return this.name();
         }
@@ -67,6 +85,15 @@ public interface Snapshot extends ControlledEntity, Identity, InternalIdentity {
         }
     }
 
+    enum Event {
+        CreateRequested,
+        OperationNotPerformed,
+        BackupToSecondary,
+        BackedupToSecondary,
+        OperationSucceeded,
+        OperationFailed
+    }
+
     public static final long MANUAL_POLICY_ID = 0L;
 
     long getAccountId();
@@ -81,7 +108,7 @@ public interface Snapshot extends ControlledEntity, Identity, InternalIdentity {
 
     Type getType();
 
-    Status getStatus();
+    State getState();
 
     HypervisorType getHypervisorType();
 

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/api/src/org/apache/cloudstack/api/response/SnapshotResponse.java
----------------------------------------------------------------------
diff --git a/api/src/org/apache/cloudstack/api/response/SnapshotResponse.java b/api/src/org/apache/cloudstack/api/response/SnapshotResponse.java
index 8ea0d7f..58b7cf1 100644
--- a/api/src/org/apache/cloudstack/api/response/SnapshotResponse.java
+++ b/api/src/org/apache/cloudstack/api/response/SnapshotResponse.java
@@ -16,16 +16,16 @@
 // under the License.
 package org.apache.cloudstack.api.response;
 
-import java.util.Date;
-import java.util.List;
-
-import org.apache.cloudstack.api.ApiConstants;
 import com.cloud.serializer.Param;
 import com.cloud.storage.Snapshot;
 import com.google.gson.annotations.SerializedName;
+import org.apache.cloudstack.api.ApiConstants;
 import org.apache.cloudstack.api.BaseResponse;
 import org.apache.cloudstack.api.EntityReference;
 
+import java.util.Date;
+import java.util.List;
+
 @EntityReference(value=Snapshot.class)
 @SuppressWarnings("unused")
 public class SnapshotResponse extends BaseResponse implements ControlledEntityResponse {
@@ -81,7 +81,7 @@ public class SnapshotResponse extends BaseResponse implements ControlledEntityRe
 
     @SerializedName(ApiConstants.STATE)
     @Param(description = "the state of the snapshot. BackedUp means that snapshot is ready to be used; Creating - the snapshot is being allocated on the primary storage; BackingUp - the snapshot is being backed up on secondary storage")
-    private Snapshot.Status state;
+    private Snapshot.State state;
 
     @SerializedName(ApiConstants.TAGS)  @Param(description="the list of resource tags associated with snapshot", responseObject = ResourceTagResponse.class)
     private List<ResourceTagResponse> tags;
@@ -149,7 +149,7 @@ public class SnapshotResponse extends BaseResponse implements ControlledEntityRe
         this.intervalType = intervalType;
     }
 
-    public void setState(Snapshot.Status state) {
+    public void setState(Snapshot.State state) {
         this.state = state;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 7ebe50c..63ec2ef 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -117,6 +117,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.cloudstack</groupId>
+      <artifactId>cloud-mom-rabbitmq</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>mysql</groupId>
       <artifactId>mysql-connector-java</artifactId>
       <version>${cs.mysql.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/core/src/com/cloud/storage/SnapshotVO.java
----------------------------------------------------------------------
diff --git a/core/src/com/cloud/storage/SnapshotVO.java b/core/src/com/cloud/storage/SnapshotVO.java
index e5e3650..c1c5f21 100644
--- a/core/src/com/cloud/storage/SnapshotVO.java
+++ b/core/src/com/cloud/storage/SnapshotVO.java
@@ -16,23 +16,13 @@
 // under the License.
 package com.cloud.storage;
 
-import java.util.Date;
-import java.util.UUID;
-
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.EnumType;
-import javax.persistence.Enumerated;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Table;
-
-import org.apache.cloudstack.api.Identity;
 import com.cloud.hypervisor.Hypervisor.HypervisorType;
 import com.cloud.utils.db.GenericDao;
 import com.google.gson.annotations.Expose;
-import org.apache.cloudstack.api.InternalIdentity;
+
+import javax.persistence.*;
+import java.util.Date;
+import java.util.UUID;
 
 @Entity
 @Table(name="snapshots")
@@ -69,7 +59,7 @@ public class SnapshotVO implements Snapshot {
     @Expose
     @Column(name="status", updatable = true, nullable=false)
     @Enumerated(value=EnumType.STRING)
-    private Status status;
+    private State status;
 
     @Column(name="snapshot_type")
     short snapshotType;
@@ -127,7 +117,7 @@ public class SnapshotVO implements Snapshot {
         this.snapshotType = snapshotType;
         this.typeDescription = typeDescription;
         this.size = size;
-        this.status = Status.Creating;
+        this.status = State.Creating;
         this.prevSnapshotId = 0;
         this.hypervisorType = hypervisorType;
         this.version = "2.2";
@@ -252,11 +242,11 @@ public class SnapshotVO implements Snapshot {
     }
 
 	@Override
-    public Status getStatus() {
+    public State getState() {
 		return status;
 	}
 
-	public void setStatus(Status status) {
+	public void setStatus(State status) {
 		this.status = status;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/framework/events/pom.xml
----------------------------------------------------------------------
diff --git a/framework/events/pom.xml b/framework/events/pom.xml
new file mode 100644
index 0000000..ef812e5
--- /dev/null
+++ b/framework/events/pom.xml
@@ -0,0 +1,47 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>cloud-framework-events</artifactId>
+  <name>Apache CloudStack Event Notification Framework</name>
+  <parent>
+    <groupId>org.apache.cloudstack</groupId>
+    <artifactId>cloudstack-framework</artifactId>
+    <version>4.1.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.cloudstack</groupId>
+        <artifactId>cloud-utils</artifactId>
+        <version>${project.version}</version>
+     </dependency>
+    <dependency>
+        <groupId>com.google.code.gson</groupId>
+        <artifactId>gson</artifactId>
+        <version>${cs.gson.version}</version>
+    </dependency>
+  </dependencies>
+  <build>
+    <defaultGoal>install</defaultGoal>
+    <sourceDirectory>src</sourceDirectory>
+    <testSourceDirectory>test</testSourceDirectory>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/framework/events/src/org/apache/cloudstack/framework/events/Event.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/Event.java b/framework/events/src/org/apache/cloudstack/framework/events/Event.java
new file mode 100644
index 0000000..eb6f48d
--- /dev/null
+++ b/framework/events/src/org/apache/cloudstack/framework/events/Event.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.events;
+
+import com.google.gson.Gson;
+
+public class Event {
+
+    String eventCategory;
+    String eventType;
+    String eventSource;
+    String resourceType;
+    String resourceUUID;
+    String description;
+
+    public Event(String eventSource, String eventCategory, String eventType, String resourceType,
+                 String resourceUUID) {
+        this.eventCategory = eventCategory;
+        this.eventType = eventType;
+        this.eventSource = eventSource;
+        this.resourceType = resourceType;
+        this.resourceUUID = resourceUUID;
+    }
+
+    public String getEventCategory() {
+        return eventCategory;
+    }
+
+    public void setEventCategory(String category) {
+        eventCategory = category;
+    }
+
+    public String getEventType() {
+        return eventType;
+    }
+
+    public void setEventType(String type) {
+        eventType = type;
+    }
+
+    public String getEventSource() {
+        return eventSource;
+    }
+
+    void setEventSource(String source) {
+        eventSource = source;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription (Object message) {
+        Gson gson = new Gson();
+        this.description = gson.toJson(message).toString();
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public String getResourceType() {
+        return resourceType;
+    }
+
+    public void setResourceType(String resourceType) {
+        this.resourceType = resourceType;
+    }
+
+    public void setResourceUUID(String uuid) {
+        this.resourceUUID = uuid;
+    }
+
+    public String getResourceUUID () {
+        return resourceUUID;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java b/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java
new file mode 100644
index 0000000..c16ee6f
--- /dev/null
+++ b/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.events;
+
+import com.cloud.utils.component.Adapter;
+
+import java.util.UUID;
+
+/**
+ * Interface to publish and subscribe to CloudStack events
+ *
+ */
+public interface EventBus extends Adapter{
+
+    /**
+     * publish an event on to the event bus
+     *
+     * @param event event that needs to be published on the event bus
+     */
+    void publish(Event event) throws EventBusException;
+
+    /**
+     * subscribe to events that matches specified event topics
+     *
+     * @param topic defines category and type of the events being subscribed to
+     * @param subscriber subscriber that intends to receive event notification
+     * @return UUID returns the subscription ID
+     */
+     UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException;
+
+    /**
+     * unsubscribe to events of a category and a type
+     *
+     * @param subscriber subscriber that intends to unsubscribe from the event notification
+     */
+    void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java b/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java
new file mode 100644
index 0000000..5654ba0
--- /dev/null
+++ b/framework/events/src/org/apache/cloudstack/framework/events/EventBusException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.events;
+
+public class EventBusException extends Exception{
+    public EventBusException (String msg) {
+      super(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java b/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java
new file mode 100644
index 0000000..b1c30c2
--- /dev/null
+++ b/framework/events/src/org/apache/cloudstack/framework/events/EventSubscriber.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.events;
+
+public interface EventSubscriber {
+
+    /**
+     * Callback method. EventBus calls this method on occurrence of subscribed event
+     *
+     * @param event details of the event
+     */
+    void onEvent(Event event);
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java
----------------------------------------------------------------------
diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java b/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java
new file mode 100644
index 0000000..19b727d
--- /dev/null
+++ b/framework/events/src/org/apache/cloudstack/framework/events/EventTopic.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.framework.events;
+
+public class EventTopic {
+
+    String eventCategory;
+    String eventType;
+    String resourceType;
+    String resourceUUID;
+    String eventSource;
+
+    public EventTopic(String eventCategory, String eventType, String resourceType, String resourceUUID, String eventSource) {
+        this.eventCategory = eventCategory;
+        this.eventType = eventType;
+        this.resourceType = resourceType;
+        this.resourceUUID = resourceUUID;
+        this.eventSource = eventSource;
+    }
+
+    public String getEventCategory() {
+        return eventCategory;
+    }
+
+    public String getEventType() {
+        return eventType;
+    }
+
+    public String getResourceType() {
+        return resourceType;
+    }
+
+    public String getEventSource() {
+        return eventSource;
+    }
+
+    public String getResourceUUID() {
+        return resourceUUID;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/framework/pom.xml
----------------------------------------------------------------------
diff --git a/framework/pom.xml b/framework/pom.xml
new file mode 100644
index 0000000..81e0916
--- /dev/null
+++ b/framework/pom.xml
@@ -0,0 +1,35 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>cloudstack-framework</artifactId>
+  <name>Apache CloudStack framework POM</name>
+  <packaging>pom</packaging>
+  <parent>
+    <groupId>org.apache.cloudstack</groupId>
+    <artifactId>cloudstack</artifactId>
+    <version>4.1.0-SNAPSHOT</version>
+  </parent>
+  <build>
+    <defaultGoal>install</defaultGoal>
+  </build>
+  <modules>
+    <module>events</module>
+  </modules>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/plugins/event-bus/rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/plugins/event-bus/rabbitmq/pom.xml b/plugins/event-bus/rabbitmq/pom.xml
new file mode 100644
index 0000000..6a47983
--- /dev/null
+++ b/plugins/event-bus/rabbitmq/pom.xml
@@ -0,0 +1,46 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>cloud-mom-rabbitmq</artifactId>
+  <name>Apache CloudStack Plugin - RabbitMQ Event Bus</name>
+  <parent>
+    <groupId>org.apache.cloudstack</groupId>
+    <artifactId>cloudstack-plugins</artifactId>
+    <version>4.1.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+  <dependencies>
+    <dependency>
+    <groupId>com.rabbitmq</groupId>
+      <artifactId>amqp-client</artifactId>
+        <version>2.8.7</version>
+    </dependency>
+    <dependency>
+    <groupId>org.apache.cloudstack</groupId>
+      <artifactId>cloud-framework-events</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+  <build>
+    <defaultGoal>install</defaultGoal>
+    <sourceDirectory>src</sourceDirectory>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
----------------------------------------------------------------------
diff --git a/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
new file mode 100644
index 0000000..3a06c42
--- /dev/null
+++ b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cloudstack.mom.rabbitmq;
+
+import com.rabbitmq.client.*;
+import org.apache.cloudstack.framework.events.*;
+import org.apache.log4j.Logger;
+
+import com.cloud.utils.Ternary;
+
+import javax.ejb.Local;
+import javax.naming.ConfigurationException;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+@Local(value=EventBus.class)
+public class RabbitMQEventBus implements EventBus {
+
+    // details of AMQP server
+    private static String _amqpHost;
+    private static Integer _port;
+    private static String _username;
+    private static String _password;
+
+    // AMQP exchange name where all CloudStack events will be published
+    private static String _amqpExchangeName;
+
+    // hashmap to book keep the registered subscribers
+    private static ConcurrentHashMap<String, Ternary<String, Channel, EventSubscriber>> _subscribers;
+
+    // connection to AMQP server,
+    private static Connection _connection=null;
+
+    // AMQP server should consider messages acknowledged once delivered if _autoAck is true
+    private static boolean _autoAck = true;
+
+    private ExecutorService executorService;
+    private String _name;
+    private static DisconnectHandler disconnectHandler;
+    private static Integer _retryInterval;
+    private static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class);
+
+    @Override
+    public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
+
+        _amqpHost = (String) params.get("server");
+        if (_amqpHost == null || _amqpHost.isEmpty()) {
+            throw new ConfigurationException("Unable to get the AMQP server details");
+        }
+
+        _username = (String) params.get("username");
+        if (_username == null || _username.isEmpty()) {
+            throw new ConfigurationException("Unable to get the username details");
+        }
+
+        _password = (String) params.get("password");
+        if (_password == null || _password.isEmpty()) {
+            throw new ConfigurationException("Unable to get the password details");
+        }
+
+        _amqpExchangeName = (String) params.get("exchangename");
+        if (_amqpExchangeName == null || _amqpExchangeName.isEmpty()) {
+            throw new ConfigurationException("Unable to get the _exchange details on the AMQP server");
+        }
+
+        try {
+            String portStr =  (String) params.get("port");
+            if (portStr == null || portStr.isEmpty()) {
+                throw new ConfigurationException("Unable to get the port details of AMQP server");
+            }
+            _port = Integer.parseInt(portStr);
+
+            String retryIntervalStr = (String) params.get("retryinterval");
+            if (retryIntervalStr == null || retryIntervalStr.isEmpty()) {
+                // default to 10s to try out reconnect
+                retryIntervalStr = "10000";
+            }
+            _retryInterval = Integer.parseInt(retryIntervalStr);
+        } catch (NumberFormatException e) {
+            throw new ConfigurationException("Invalid port number/retry interval");
+        }
+
+        _subscribers = new ConcurrentHashMap<String, Ternary<String, Channel, EventSubscriber>>();
+
+        executorService = Executors.newCachedThreadPool();
+        disconnectHandler = new DisconnectHandler();
+        _name = name;
+        return true;
+    }
+
+    /** Call to subscribe to interested set of events
+     *
+     * @param topic defines category and type of the events being subscribed to
+     * @param subscriber subscriber that intends to receive event notification
+     * @return UUID that represents the subscription with event bus
+     * @throws EventBusException
+     */
+    @Override
+    public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
+
+        if (subscriber == null || topic == null) {
+            throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
+        }
+
+        // create a UUID, that will be used for managing subscriptions and also used as queue name
+        // for on the queue used for the subscriber on the AMQP broker
+        UUID queueId = UUID.randomUUID();
+        String queueName = queueId.toString();
+
+        try {
+            String bindingKey = createBindingKey(topic);
+
+            // store the subscriber details before creating channel
+            _subscribers.put(queueName, new Ternary(bindingKey, null, subscriber));
+
+            // create a channel dedicated for this subscription
+            Connection connection = getConnection();
+            Channel channel = createChannel(connection);
+
+            // create a queue and bind it to the exchange with binding key formed from event topic
+            createExchange(channel, _amqpExchangeName);
+            channel.queueDeclare(queueName, false, false, false, null);
+            channel.queueBind(queueName, _amqpExchangeName, bindingKey);
+
+            // register a callback handler to receive the events that a subscriber subscribed to
+            channel.basicConsume(queueName, _autoAck, queueName,
+                    new DefaultConsumer(channel) {
+                        @Override
+                        public void handleDelivery(String queueName,
+                                                   Envelope envelope,
+                                                   AMQP.BasicProperties properties,
+                                                   byte[] body)
+                            throws IOException {
+                            Ternary<String, Channel, EventSubscriber> queueDetails = _subscribers.get(queueName);
+                            if (queueDetails != null) {
+                                EventSubscriber subscriber = queueDetails.third();
+                                String routingKey =  envelope.getRoutingKey();
+                                String eventSource = getEventSourceFromRoutingKey(routingKey);
+                                String eventCategory = getEventCategoryFromRoutingKey(routingKey);
+                                String eventType = getEventTypeFromRoutingKey(routingKey);
+                                String resourceType = getResourceTypeFromRoutingKey(routingKey);
+                                String resourceUUID = getResourceUUIDFromRoutingKey(routingKey);
+                                Event event = new Event(eventSource, eventCategory, eventType,
+                                        resourceType, resourceUUID);
+                                event.setDescription(new String(body));
+
+                                // deliver the event to call back object provided by subscriber
+                                subscriber.onEvent(event);
+                            }
+                        }
+                    }
+            );
+
+            // update the channel details for the subscription
+            Ternary<String, Channel, EventSubscriber> queueDetails = _subscribers.get(queueName);
+            queueDetails.second(channel);
+            _subscribers.put(queueName, queueDetails);
+
+        } catch (AlreadyClosedException closedException) {
+            s_logger.warn("Connection to AMQP service is lost. Subscription:" + queueName +
+                    " will be active after reconnection");
+        } catch (ConnectException connectException) {
+            s_logger.warn("Connection to AMQP service is lost. Subscription:" + queueName +
+                    " will be active after reconnection");
+        } catch (Exception e) {
+            throw new EventBusException("Failed to subscribe to event due to " + e.getMessage());
+        }
+
+        return queueId;
+    }
+
+    @Override
+    public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
+        try {
+            String classname =  subscriber.getClass().getName();
+            String queueName = UUID.nameUUIDFromBytes(classname.getBytes()).toString();
+            Ternary<String, Channel, EventSubscriber> queueDetails = _subscribers.get(queueName);
+            Channel channel = queueDetails.second();
+            channel.basicCancel(queueName);
+            _subscribers.remove(queueName, queueDetails);
+        } catch (Exception e) {
+            throw new EventBusException("Failed to unsubscribe from event bus due to " + e.getMessage());
+        }
+    }
+
+    // publish event on to the exchange created on AMQP server
+    @Override
+    public void publish(Event event) throws EventBusException {
+
+        String routingKey = createRoutingKey(event);
+        String eventDescription = event.getDescription();
+
+        try {
+            Connection connection = getConnection();
+            Channel channel = createChannel(connection);
+            createExchange(channel, _amqpExchangeName);
+            publishEventToExchange(channel, _amqpExchangeName, routingKey, eventDescription);
+            channel.close();
+        } catch (AlreadyClosedException e) {
+            closeConnection();
+            throw new EventBusException("Failed to publish event to message broker as connection to AMQP broker in lost");
+        } catch (Exception e) {
+            throw new EventBusException("Failed to publish event to message broker due to " + e.getMessage());
+        }
+    }
+
+    /** creates a routing key from the event details.
+     *  created routing key will be used while publishing the message to exchange on AMQP server
+     */
+    private String createRoutingKey(Event event) {
+
+        StringBuilder routingKey = new StringBuilder();
+
+        String eventSource =  replaceNullWithWildcard(event.getEventSource());
+        eventSource = eventSource.replace(".", "-");
+
+        String eventCategory = replaceNullWithWildcard(event.getEventCategory());
+        eventCategory = eventCategory.replace(".", "-");
+
+        String eventType = replaceNullWithWildcard(event.getEventType());
+        eventType = eventType.replace(".", "-");
+
+        String resourceType = replaceNullWithWildcard(event.getResourceType());
+        resourceType = resourceType.replace(".", "-");
+
+        String resourceUuid = replaceNullWithWildcard(event.getResourceUUID());
+        resourceUuid = resourceUuid.replace(".", "-");
+
+        // routing key will be of format: eventSource.eventCategory.eventType.resourceType.resourceUuid
+        routingKey.append(eventSource);
+        routingKey.append(".");
+        routingKey.append(eventCategory);
+        routingKey.append(".");
+        routingKey.append(eventType);
+        routingKey.append(".");
+        routingKey.append(resourceType);
+        routingKey.append(".");
+        routingKey.append(resourceUuid);
+
+        return routingKey.toString();
+    }
+
+    /** creates a binding key from the event topic that subscriber specified
+     *  binding key will be used to bind the queue created for subscriber to exchange on AMQP server
+     */
+    private String createBindingKey(EventTopic topic) {
+
+        StringBuilder bindingKey = new StringBuilder();
+
+        String eventSource =  replaceNullWithWildcard(topic.getEventSource());
+        eventSource = eventSource.replace(".", "-");
+
+        String eventCategory = replaceNullWithWildcard(topic.getEventCategory());
+        eventCategory = eventCategory.replace(".", "-");
+
+        String eventType = replaceNullWithWildcard(topic.getEventType());
+        eventType = eventType.replace(".", "-");
+
+        String resourceType = replaceNullWithWildcard(topic.getResourceType());
+        resourceType = resourceType.replace(".", "-");
+
+        String resourceUuid = replaceNullWithWildcard(topic.getResourceUUID());
+        resourceUuid = resourceUuid.replace(".", "-");
+
+        // binding key will be of format: eventSource.eventCategory.eventType.resourceType.resourceUuid
+        bindingKey.append(eventSource);
+        bindingKey.append(".");
+        bindingKey.append(eventCategory);
+        bindingKey.append(".");
+        bindingKey.append(eventType);
+        bindingKey.append(".");
+        bindingKey.append(resourceType);
+        bindingKey.append(".");
+        bindingKey.append(resourceUuid);
+
+        return bindingKey.toString();
+    }
+
+    private synchronized Connection getConnection() throws Exception {
+        if (_connection == null) {
+            try {
+                return createConnection();
+            } catch (Exception e) {
+                s_logger.error("Failed to create a connection to AMQP server due to " + e.getMessage());
+                throw e;
+            }
+        } else {
+            return _connection;
+        }
+    }
+
+    private synchronized Connection createConnection() throws Exception {
+        try {
+            ConnectionFactory factory = new ConnectionFactory();
+            factory.setUsername(_username);
+            factory.setPassword(_password);
+            factory.setVirtualHost("/");
+            factory.setHost(_amqpHost);
+            factory.setPort(_port);
+            Connection connection = factory.newConnection();
+            connection.addShutdownListener(disconnectHandler);
+            _connection = connection;
+            return _connection;
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    private synchronized void closeConnection() {
+        try {
+            if (_connection != null) {
+                _connection.close();
+            }
+        } catch (Exception e) {
+            s_logger.warn("Failed to close connection to AMQP server due to " + e.getMessage());
+        }
+        _connection = null;
+    }
+
+    private synchronized void abortConnection () {
+        if (_connection == null)
+            return;
+
+        try {
+            _connection.abort();
+        } catch (Exception e) {
+            s_logger.warn("Failed to abort connection due to " + e.getMessage());
+        }
+        _connection = null;
+    }
+
+    private String replaceNullWithWildcard(String key) {
+        if (key == null || key.isEmpty()) {
+            return "*";
+        } else {
+            return key;
+        }
+    }
+
+    private Channel createChannel(Connection connection) throws Exception {
+        try {
+            return connection.createChannel();
+        } catch (java.io.IOException exception) {
+            s_logger.warn("Failed to create a channel due to " + exception.getMessage());
+            throw exception;
+        }
+    }
+
+    private void createExchange(Channel channel, String exchangeName) throws Exception {
+        try {
+            channel.exchangeDeclare(exchangeName, "topic", true);
+        } catch (java.io.IOException exception) {
+            s_logger.error("Failed to create exchange" + exchangeName + " on RabbitMQ server");
+            throw exception;
+        }
+    }
+
+    private void publishEventToExchange(Channel channel, String exchangeName,
+                                        String routingKey, String eventDescription) throws Exception {
+        try {
+            byte[] messageBodyBytes = eventDescription.getBytes();
+            channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
+        } catch (Exception e) {
+            s_logger.error("Failed to publish event " + routingKey + " on exchange " + exchangeName +
+                    "  of message broker due to " + e.getMessage());
+            throw e;
+        }
+    }
+
+    private String getEventCategoryFromRoutingKey(String routingKey) {
+        String[] keyParts =  routingKey.split("\\.");
+        return keyParts[1];
+    }
+
+    private String getEventTypeFromRoutingKey(String routingKey) {
+        String[] keyParts =  routingKey.split("\\.");
+        return keyParts[2];
+    }
+
+    private String getEventSourceFromRoutingKey(String routingKey) {
+        String[] keyParts =  routingKey.split("\\.");
+        return keyParts[0];
+    }
+
+    private String getResourceTypeFromRoutingKey(String routingKey) {
+        String[] keyParts =  routingKey.split("\\.");
+        return keyParts[3];
+    }
+
+    private String getResourceUUIDFromRoutingKey(String routingKey) {
+        String[] keyParts =  routingKey.split("\\.");
+        return keyParts[4];
+    }
+
+    @Override
+    public String getName() {
+        return _name;
+    }
+
+    @Override
+    public boolean start() {
+        ReconnectionTask reconnect = new ReconnectionTask(); // initiate connection to AMQP server
+        executorService.submit(reconnect);
+        return true;
+    }
+
+    @Override
+    public boolean stop() {
+
+        if (_connection.isOpen()) {
+            for (String subscriberId : _subscribers.keySet()) {
+                Ternary<String, Channel, EventSubscriber> subscriberDetails = _subscribers.get(subscriberId);
+                Channel channel =  subscriberDetails.second();
+                String queueName = subscriberId;
+                try {
+                    channel.queueDelete(queueName);
+                    channel.abort();
+                } catch (IOException ioe) {
+                    s_logger.warn("Failed to delete queue: " + queueName + " on AMQP server due to " + ioe.getMessage() );
+                }
+            }
+        }
+
+        closeConnection();
+        return true;
+    }
+
+    // logic to deal with loss of connection to AMQP server
+    private class DisconnectHandler implements ShutdownListener {
+
+        @Override
+        public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
+            if (!shutdownSignalException.isInitiatedByApplication()) {
+
+                for (String subscriberId : _subscribers.keySet()) {
+                    Ternary<String, Channel, EventSubscriber> subscriberDetails = _subscribers.get(subscriberId);
+                    subscriberDetails.second(null);
+                    _subscribers.put(subscriberId, subscriberDetails);
+                }
+
+                abortConnection(); // disconnected to AMQP server, so abort the connection and channels
+                s_logger.warn("Connection has been shutdown by AMQP server. Attempting to reconnect.");
+
+                // initiate re-connect process
+                ReconnectionTask reconnect = new ReconnectionTask();
+                executorService.submit(reconnect);
+            }
+        }
+    }
+
+    // retry logic to connect back to AMQP server after loss of connection
+    private class ReconnectionTask implements Runnable {
+
+        boolean connected = false;
+        Connection connection = null;
+
+        public void run() {
+
+            while (!connected) {
+                try {
+                    Thread.sleep(_retryInterval);
+                } catch (InterruptedException ie) {
+                    // ignore timer interrupts
+                }
+
+                try {
+                    try {
+                        connection = createConnection();
+                        connected = true;
+                    } catch (IOException ie) {
+                        continue; // can't establish connection to AMQP server yet, so continue
+                    }
+
+                    // prepare consumer on AMQP server for each of subscriber
+                    for (String subscriberId : _subscribers.keySet()) {
+                        Ternary<String, Channel, EventSubscriber> subscriberDetails = _subscribers.get(subscriberId);
+                        String bindingKey = subscriberDetails.first();
+                        EventSubscriber subscriber = subscriberDetails.third();
+
+                        /** create a queue with subscriber ID as queue name and bind it to the exchange
+                         *  with binding key formed from event topic
+                         */
+                        Channel channel = createChannel(connection);
+                        createExchange(channel, _amqpExchangeName);
+                        channel.queueDeclare(subscriberId, false, false, false, null);
+                        channel.queueBind(subscriberId, _amqpExchangeName, bindingKey);
+
+                        // register a callback handler to receive the events that a subscriber subscribed to
+                        channel.basicConsume(subscriberId, _autoAck, subscriberId,
+                                new DefaultConsumer(channel) {
+                                    @Override
+                                    public void handleDelivery(String queueName,
+                                                               Envelope envelope,
+                                                               AMQP.BasicProperties properties,
+                                                               byte[] body)
+                                            throws IOException {
+
+                                        Ternary<String, Channel, EventSubscriber> subscriberDetails
+                                                = _subscribers.get(queueName); // queue name == subscriber ID
+
+                                        if (subscriberDetails != null) {
+                                            EventSubscriber subscriber = subscriberDetails.third();
+                                            String routingKey =  envelope.getRoutingKey();
+                                            String eventSource = getEventSourceFromRoutingKey(routingKey);
+                                            String eventCategory = getEventCategoryFromRoutingKey(routingKey);
+                                            String eventType = getEventTypeFromRoutingKey(routingKey);
+                                            String resourceType = getResourceTypeFromRoutingKey(routingKey);
+                                            String resourceUUID = getResourceUUIDFromRoutingKey(routingKey);
+
+                                            // create event object from the message details obtained from AMQP server
+                                            Event event = new Event(eventSource, eventCategory, eventType,
+                                                    resourceType, resourceUUID);
+                                            event.setDescription(new String(body));
+
+                                            // deliver the event to call back object provided by subscriber
+                                            subscriber.onEvent(event);
+                                        }
+                                    }
+                                }
+                        );
+
+                        // update the channel details for the subscription
+                        subscriberDetails.second(channel);
+                        _subscribers.put(subscriberId, subscriberDetails);
+                    }
+                } catch (Exception e) {
+                    s_logger.warn("Failed to recreate queues and binding for the subscribers due to " + e.getMessage());
+                }
+            }
+            return;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/plugins/network-elements/ovs/src/com/cloud/network/guru/OvsGuestNetworkGuru.java
----------------------------------------------------------------------
diff --git a/plugins/network-elements/ovs/src/com/cloud/network/guru/OvsGuestNetworkGuru.java b/plugins/network-elements/ovs/src/com/cloud/network/guru/OvsGuestNetworkGuru.java
index 30a1129..2104322 100644
--- a/plugins/network-elements/ovs/src/com/cloud/network/guru/OvsGuestNetworkGuru.java
+++ b/plugins/network-elements/ovs/src/com/cloud/network/guru/OvsGuestNetworkGuru.java
@@ -18,6 +18,7 @@ package com.cloud.network.guru;
 
 import javax.ejb.Local;
 
+import com.cloud.event.ActionEventUtils;
 import org.apache.log4j.Logger;
 
 import com.cloud.dc.DataCenter;
@@ -25,7 +26,6 @@ import com.cloud.dc.DataCenter.NetworkType;
 import com.cloud.deploy.DeployDestination;
 import com.cloud.deploy.DeploymentPlan;
 import com.cloud.event.EventTypes;
-import com.cloud.event.EventUtils;
 import com.cloud.event.EventVO;
 import com.cloud.exception.InsufficientVirtualNetworkCapcityException;
 import com.cloud.network.Network;
@@ -95,7 +95,7 @@ public class OvsGuestNetworkGuru extends GuestNetworkGuru {
                 throw new InsufficientVirtualNetworkCapcityException("Unable to allocate vnet as a part of network " + network + " implement ", DataCenter.class, dcId);
             }
             implemented.setBroadcastUri(BroadcastDomainType.Vswitch.toUri(vnet));
-            EventUtils.saveEvent(UserContext.current().getCallerUserId(), network.getAccountId(), EventVO.LEVEL_INFO, EventTypes.EVENT_ZONE_VLAN_ASSIGN, "Assigned Zone Vlan: "+vnet+ " Network Id: "+network.getId(), 0);
+            ActionEventUtils.onCompletedActionEvent(UserContext.current().getCallerUserId(), network.getAccountId(), EventVO.LEVEL_INFO, EventTypes.EVENT_ZONE_VLAN_ASSIGN, "Assigned Zone Vlan: " + vnet + " Network Id: " + network.getId(), 0);
         } else {
             implemented.setBroadcastUri(network.getBroadcastUri());
         }

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/plugins/pom.xml
----------------------------------------------------------------------
diff --git a/plugins/pom.xml b/plugins/pom.xml
index c5b6e58..f91c6ee 100644
--- a/plugins/pom.xml
+++ b/plugins/pom.xml
@@ -41,6 +41,7 @@
     <module>hypervisors/ovm</module>
     <module>hypervisors/xen</module>
     <module>hypervisors/kvm</module>
+    <module>event-bus/rabbitmq</module>
     <module>hypervisors/simulator</module>
     <module>hypervisors/baremetal</module>
     <module>network-elements/elastic-loadbalancer</module>

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 35d6520..59feef5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -163,6 +163,7 @@
     <module>patches</module>
     <module>client</module>
     <module>test</module>
+    <module>framework</module>
   </modules>
 
   <dependencyManagement>

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e7a554fc/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index 4c3ba6f..ef1b68a 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -80,6 +80,11 @@
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.cloudstack</groupId>
+      <artifactId>cloud-framework-events</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
   <build>
     <defaultGoal>install</defaultGoal>


Mime
View raw message