cloudstack-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kelv...@apache.org
Subject git commit: updated refs/heads/4.4-forward to be2b591
Date Tue, 06 May 2014 23:48:48 GMT
Repository: cloudstack
Updated Branches:
  refs/heads/4.4-forward 9db53e501 -> be2b5918e


CLOUDSTACK-6586: Move EventBus hookup on job framework to ApiServer to decouple job framework
away from business logic related hookups. The decoupling is done through internal messaging
facility provided inside management server.


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/be2b5918
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/be2b5918
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/be2b5918

Branch: refs/heads/4.4-forward
Commit: be2b5918e6b09a488d75966c49d34bdeb803fdd8
Parents: 9db53e5
Author: Kelven Yang <kelveny@gmail.com>
Authored: Mon May 5 15:21:59 2014 -0700
Committer: Kelven Yang <kelveny@gmail.com>
Committed: Tue May 6 16:48:35 2014 -0700

----------------------------------------------------------------------
 .../cloudstack/framework/jobs/AsyncJob.java     |  1 +
 .../jobs/impl/AsyncJobManagerImpl.java          |  5 ++
 server/src/com/cloud/api/ApiServer.java         | 88 ++++++++++++++++++++
 3 files changed, 94 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be2b5918/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
index e624821..0ad49e1 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java
@@ -30,6 +30,7 @@ public interface AsyncJob extends JobInfo {
     public static interface Topics {
         public static final String JOB_HEARTBEAT = "job.heartbeat";
         public static final String JOB_STATE = "job.state";
+        public static final String JOB_EVENT_PUBLISH = "job.eventpublish";
     }
 
     public static interface Constants {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be2b5918/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 42148be..24abcbe 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -60,6 +60,7 @@ import org.apache.cloudstack.utils.identity.ManagementServerNode;
 import com.cloud.cluster.ClusterManagerListener;
 import com.cloud.cluster.ManagementServerHost;
 import com.cloud.utils.DateUtil;
+import com.cloud.utils.Pair;
 import com.cloud.utils.Predicate;
 import com.cloud.utils.component.ManagerBase;
 import com.cloud.utils.concurrency.NamedThreadFactory;
@@ -1009,4 +1010,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
 
     }
 
+    private void publishOnEventBus(AsyncJob job, String jobEvent) {
+        _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
+            new Pair<AsyncJob, String>(job, jobEvent));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/be2b5918/server/src/com/cloud/api/ApiServer.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java
index 50b6869..a99d683 100755
--- a/server/src/com/cloud/api/ApiServer.java
+++ b/server/src/com/cloud/api/ApiServer.java
@@ -82,6 +82,7 @@ import org.apache.http.protocol.ResponseContent;
 import org.apache.http.protocol.ResponseDate;
 import org.apache.http.protocol.ResponseServer;
 import org.apache.log4j.Logger;
+import org.springframework.beans.factory.NoSuchBeanDefinitionException;
 import org.springframework.stereotype.Component;
 
 import org.apache.cloudstack.acl.APIChecker;
@@ -119,9 +120,14 @@ import org.apache.cloudstack.api.response.ListResponse;
 import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 import org.apache.cloudstack.framework.config.impl.ConfigurationVO;
+import org.apache.cloudstack.framework.events.EventBus;
+import org.apache.cloudstack.framework.events.EventBusException;
 import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
 import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
+import org.apache.cloudstack.framework.messagebus.MessageHandler;
 import org.apache.cloudstack.managed.context.ManagedContextRunnable;
 
 import com.cloud.api.dispatch.DispatchChainFactory;
@@ -130,8 +136,10 @@ import com.cloud.api.response.ApiResponseSerializer;
 import com.cloud.configuration.Config;
 import com.cloud.domain.Domain;
 import com.cloud.domain.DomainVO;
+import com.cloud.domain.dao.DomainDao;
 import com.cloud.event.ActionEventUtils;
 import com.cloud.event.EventTypes;
+import com.cloud.event.EventCategory;
 import com.cloud.exception.AccountLimitException;
 import com.cloud.exception.CloudAuthenticationException;
 import com.cloud.exception.InsufficientCapacityException;
@@ -182,6 +190,9 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler,
ApiSer
     @Inject
     private DomainManager _domainMgr;
     @Inject
+    private DomainDao _domainDao;
+
+    @Inject
     private AsyncJobManager _asyncMgr;
     @Inject
     private ConfigurationDao _configDao;
@@ -200,15 +211,92 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler,
ApiSer
 
     private static ExecutorService s_executor = new ThreadPoolExecutor(10, 150, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(
         "ApiServer"));
+    @Inject
+    MessageBus _messageBus;
 
     public ApiServer() {
     }
 
     @Override
     public boolean configure(final String name, final Map<String, Object> params) throws
ConfigurationException {
+        _messageBus.subscribe(AsyncJob.Topics.JOB_EVENT_PUBLISH, MessageDispatcher.getDispatcher(this));
         return true;
     }
 
+    @MessageHandler(topic = AsyncJob.Topics.JOB_EVENT_PUBLISH)
+    private void handleAsyncJobPublishEvent(String subject, String senderAddress, Object
args) {
+        assert (args != null);
+
+        @SuppressWarnings("unchecked")
+        Pair<AsyncJob, String> eventInfo = (Pair<AsyncJob, String>)args;
+        AsyncJob job = eventInfo.first();
+        String jobEvent = eventInfo.second();
+
+        if (s_logger.isTraceEnabled())
+            s_logger.trace("Handle asyjob publish event " + jobEvent);
+
+        EventBus eventBus = null;
+        try {
+            eventBus = ComponentContext.getComponent(EventBus.class);
+        } catch (NoSuchBeanDefinitionException nbe) {
+            return; // no provider is configured to provide events bus, so just return
+        }
+
+        if (!job.getDispatcher().equalsIgnoreCase("ApiAsyncJobDispatcher")) {
+            return;
+        }
+
+        User userJobOwner = _accountMgr.getUserIncludingRemoved(job.getUserId());
+        Account jobOwner = _accountMgr.getAccount(userJobOwner.getAccountId());
+
+        // Get the event type from the cmdInfo json string
+        String info = job.getCmdInfo();
+        String cmdEventType;
+        if (info == null) {
+            cmdEventType = "unknown";
+        } else {
+            String marker = "\"cmdEventType\"";
+            int begin = info.indexOf(marker);
+            cmdEventType = info.substring(begin + marker.length() + 2, info.indexOf(",",
begin) - 1);
+        }
+
+        // For some reason, the instanceType / instanceId are not abstract, which means we
may get null values.
+        org.apache.cloudstack.framework.events.Event event = new org.apache.cloudstack.framework.events.Event(
+                "management-server",
+                EventCategory.ASYNC_JOB_CHANGE_EVENT.getName(),
+                jobEvent,
+                (job.getInstanceType() != null ? job.getInstanceType().toString() : "unknown"),
null);
+
+        Map<String, String> eventDescription = new HashMap<String, String>();
+        eventDescription.put("command", job.getCmd());
+        eventDescription.put("user", userJobOwner.getUuid());
+        eventDescription.put("account", jobOwner.getUuid());
+        eventDescription.put("processStatus", "" + job.getProcessStatus());
+        eventDescription.put("resultCode", "" + job.getResultCode());
+        eventDescription.put("instanceUuid", ApiDBUtils.findJobInstanceUuid(job));
+        eventDescription.put("instanceType", (job.getInstanceType() != null ? job.getInstanceType().toString()
: "unknown"));
+        eventDescription.put("commandEventType", cmdEventType);
+        eventDescription.put("jobId", job.getUuid());
+        // If the event.accountinfo boolean value is set, get the human readable value for
the username / domainname
+        Map<String, String> configs = _configDao.getConfiguration("management-server",
new HashMap<String, String>());
+        if (Boolean.valueOf(configs.get("event.accountinfo"))) {
+            DomainVO domain = _domainDao.findById(jobOwner.getDomainId());
+            eventDescription.put("username", userJobOwner.getUsername());
+            eventDescription.put("domainname", domain.getName());
+        }
+        event.setDescription(eventDescription);
+
+        try {
+            eventBus.publish(event);
+        } catch (EventBusException evx) {
+            String errMsg = "F" +
+                    "" +
+                    "ailed to publish async job event on the the event bus.";
+            s_logger.warn(errMsg, evx);
+            throw new CloudRuntimeException(errMsg);
+        }
+    }
+
     @Override
     public boolean start() {
         Integer apiPort = null; // api port, null by default


Mime
View raw message