syncope-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ilgro...@apache.org
Subject [2/2] syncope git commit: Handling notification and audit events in a Quartz job rather than in the same, possibly failing, transaction of the event itself
Date Mon, 12 Jun 2017 11:33:15 GMT
Handling notification and audit events in a Quartz job rather than in the same, possibly failing,
transaction of the event itself


Project: http://git-wip-us.apache.org/repos/asf/syncope/repo
Commit: http://git-wip-us.apache.org/repos/asf/syncope/commit/a90d50ec
Tree: http://git-wip-us.apache.org/repos/asf/syncope/tree/a90d50ec
Diff: http://git-wip-us.apache.org/repos/asf/syncope/diff/a90d50ec

Branch: refs/heads/master
Commit: a90d50ec5e29e528beacd3a4e2a032b6b169c2b5
Parents: 9ec5f83
Author: Francesco Chicchiriccò <ilgrosso@apache.org>
Authored: Mon Jun 12 13:32:55 2017 +0200
Committer: Francesco Chicchiriccò <ilgrosso@apache.org>
Committed: Mon Jun 12 13:33:03 2017 +0200

----------------------------------------------------------------------
 .../core/logic/LogicInvocationHandler.java      |  11 +-
 .../api/event/AfterHandlingEvent.java           |   9 +-
 .../provisioning/java/AuditManagerImpl.java     |   4 -
 .../java/job/AbstractInterruptableJob.java      |   9 +-
 .../provisioning/java/job/AfterHandlingJob.java | 105 +++++++++++++++++++
 .../notification/NotificationJobDelegate.java   |  20 +---
 .../notification/NotificationManagerImpl.java   |   4 -
 .../pushpull/AbstractPushResultHandler.java     |  11 +-
 .../pushpull/RealmPushResultHandlerImpl.java    |  11 +-
 .../java/pushpull/SchedulingPullActions.java    |   1 -
 .../syncope/fit/core/AbstractTaskITCase.java    |  34 +++---
 .../fit/core/NotificationTaskITCase.java        |  58 +++++-----
 .../apache/syncope/fit/core/PushTaskITCase.java |   2 +-
 13 files changed, 196 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/syncope/blob/a90d50ec/core/logic/src/main/java/org/apache/syncope/core/logic/LogicInvocationHandler.java
----------------------------------------------------------------------
diff --git a/core/logic/src/main/java/org/apache/syncope/core/logic/LogicInvocationHandler.java
b/core/logic/src/main/java/org/apache/syncope/core/logic/LogicInvocationHandler.java
index 2d12682..a70f3c0 100644
--- a/core/logic/src/main/java/org/apache/syncope/core/logic/LogicInvocationHandler.java
+++ b/core/logic/src/main/java/org/apache/syncope/core/logic/LogicInvocationHandler.java
@@ -20,11 +20,14 @@ package org.apache.syncope.core.logic;
 
 import java.lang.reflect.Method;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.syncope.common.lib.types.AuditElements;
 import org.apache.syncope.core.provisioning.api.AuditManager;
 import org.apache.syncope.core.provisioning.api.notification.NotificationManager;
 import org.apache.syncope.core.provisioning.api.event.AfterHandlingEvent;
+import org.apache.syncope.core.provisioning.java.job.AfterHandlingJob;
 import org.aspectj.lang.ProceedingJoinPoint;
 import org.aspectj.lang.annotation.Around;
 import org.aspectj.lang.annotation.Aspect;
@@ -32,7 +35,7 @@ import org.aspectj.lang.reflect.MethodSignature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.scheduling.quartz.SchedulerFactoryBean;
 
 @Aspect
 public class LogicInvocationHandler {
@@ -46,7 +49,7 @@ public class LogicInvocationHandler {
     private AuditManager auditManager;
 
     @Autowired
-    private ApplicationEventPublisher publisher;
+    private SchedulerFactoryBean scheduler;
 
     @Around("execution(* org.apache.syncope.core.logic.AbstractLogic+.*(..))")
     public Object around(final ProceedingJoinPoint joinPoint) throws Throwable {
@@ -94,7 +97,8 @@ public class LogicInvocationHandler {
             LOG.debug("After throwing {}.{}", clazz.getSimpleName(), event);
             throw t;
         } finally {
-            publisher.publishEvent(new AfterHandlingEvent(this,
+            Map<String, Object> jobMap = new HashMap<>();
+            jobMap.put(AfterHandlingEvent.JOBMAP_KEY, new AfterHandlingEvent(
                     notificationsAvailable,
                     auditRequested,
                     AuditElements.EventCategoryType.LOGIC,
@@ -105,6 +109,7 @@ public class LogicInvocationHandler {
                     before,
                     output,
                     input));
+            AfterHandlingJob.schedule(scheduler, jobMap);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/syncope/blob/a90d50ec/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/event/AfterHandlingEvent.java
----------------------------------------------------------------------
diff --git a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/event/AfterHandlingEvent.java
b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/event/AfterHandlingEvent.java
index e732097..b207cb9 100644
--- a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/event/AfterHandlingEvent.java
+++ b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/event/AfterHandlingEvent.java
@@ -18,13 +18,15 @@
  */
 package org.apache.syncope.core.provisioning.api.event;
 
+import java.io.Serializable;
 import org.apache.syncope.common.lib.types.AuditElements;
-import org.springframework.context.ApplicationEvent;
 
-public class AfterHandlingEvent extends ApplicationEvent {
+public class AfterHandlingEvent implements Serializable {
 
     private static final long serialVersionUID = 5950986229089263378L;
 
+    public static final String JOBMAP_KEY = "AfterHandlingEvent";
+
     private final boolean notificationsAvailable;
 
     private final boolean auditRequested;
@@ -46,7 +48,6 @@ public class AfterHandlingEvent extends ApplicationEvent {
     private final Object[] input;
 
     public AfterHandlingEvent(
-            final Object source,
             final boolean notificationsAvailable,
             final boolean auditRequested,
             final AuditElements.EventCategoryType type,
@@ -58,8 +59,6 @@ public class AfterHandlingEvent extends ApplicationEvent {
             final Object output,
             final Object... input) {
 
-        super(source);
-
         this.notificationsAvailable = notificationsAvailable;
         this.auditRequested = auditRequested;
         this.type = type;

http://git-wip-us.apache.org/repos/asf/syncope/blob/a90d50ec/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/AuditManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/AuditManagerImpl.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/AuditManagerImpl.java
index d553762..359538b 100644
--- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/AuditManagerImpl.java
+++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/AuditManagerImpl.java
@@ -31,9 +31,7 @@ import org.apache.syncope.core.provisioning.api.event.AfterHandlingEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
 @Transactional(readOnly = true)
@@ -80,8 +78,6 @@ public class AuditManagerImpl implements AuditManager {
         return auditRequested;
     }
 
-    @EventListener
-    @Transactional(propagation = Propagation.REQUIRES_NEW)
     @Override
     public void audit(final AfterHandlingEvent event) {
         if (event.isAuditRequested()) {

http://git-wip-us.apache.org/repos/asf/syncope/blob/a90d50ec/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractInterruptableJob.java
----------------------------------------------------------------------
diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractInterruptableJob.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractInterruptableJob.java
index 3b22728..19bbf1e 100644
--- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractInterruptableJob.java
+++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractInterruptableJob.java
@@ -18,7 +18,6 @@
  */
 package org.apache.syncope.core.provisioning.java.job;
 
-
 import java.util.Date;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.syncope.core.provisioning.api.utils.FormatUtils;
@@ -35,7 +34,7 @@ import org.slf4j.LoggerFactory;
 public abstract class AbstractInterruptableJob implements InterruptableJob {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractInterruptableJob.class);
-    
+
     /**
      * The current running thread containing the task to be executed.
      */
@@ -46,7 +45,11 @@ public abstract class AbstractInterruptableJob implements InterruptableJob
{
     @Override
     public void execute(final JobExecutionContext context) throws JobExecutionException {
         this.runningThread.set(Thread.currentThread());
-        this.interruptMaxRetries = context.getMergedJobDataMap().getLong(JobManager.INTERRUPT_MAX_RETRIES_KEY);
+        try {
+            this.interruptMaxRetries = context.getMergedJobDataMap().getLong(JobManager.INTERRUPT_MAX_RETRIES_KEY);
+        } catch (Exception e) {
+            LOG.debug("Could not set {}, defaults to {}", JobManager.INTERRUPT_MAX_RETRIES_KEY,
interruptMaxRetries, e);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/syncope/blob/a90d50ec/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AfterHandlingJob.java
----------------------------------------------------------------------
diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AfterHandlingJob.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AfterHandlingJob.java
new file mode 100644
index 0000000..86a97ca
--- /dev/null
+++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AfterHandlingJob.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.syncope.core.provisioning.java.job;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.syncope.core.provisioning.api.AuditManager;
+import org.apache.syncope.core.provisioning.api.event.AfterHandlingEvent;
+import org.apache.syncope.core.provisioning.api.job.JobManager;
+import org.apache.syncope.core.provisioning.api.job.JobNamer;
+import org.apache.syncope.core.provisioning.api.notification.NotificationManager;
+import org.apache.syncope.core.spring.ApplicationContextProvider;
+import org.apache.syncope.core.spring.security.AuthContextUtils;
+import org.quartz.JobBuilder;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.support.AbstractBeanDefinition;
+import org.springframework.scheduling.quartz.SchedulerFactoryBean;
+
+/**
+ * Quartz job for asynchronous handling of notification / audit events.
+ * Instead of direct synchronous invocation - which occurs in the same transaction where
the event is generated, the
+ * execution of the scheduled code happens in a new transaction.
+ */
+public class AfterHandlingJob extends AbstractInterruptableJob {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AfterHandlingJob.class);
+
+    public static void schedule(final SchedulerFactoryBean scheduler, final Map<String,
Object> jobMap) {
+        @SuppressWarnings("unchecked")
+        AfterHandlingJob jobInstance = (AfterHandlingJob) ApplicationContextProvider.getBeanFactory().
+                createBean(AfterHandlingJob.class, AbstractBeanDefinition.AUTOWIRE_BY_TYPE,
false);
+        String jobName = AfterHandlingJob.class.getName() + UUID.randomUUID();
+
+        jobMap.put(JobManager.DOMAIN_KEY, AuthContextUtils.getDomain());
+
+        ApplicationContextProvider.getBeanFactory().registerSingleton(jobName, jobInstance);
+
+        JobBuilder jobDetailBuilder = JobBuilder.newJob(AfterHandlingJob.class).
+                withIdentity(jobName).
+                usingJobData(new JobDataMap(jobMap));
+
+        TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger().
+                withIdentity(JobNamer.getTriggerName(jobName)).
+                startNow();
+
+        try {
+            scheduler.getScheduler().scheduleJob(jobDetailBuilder.build(), triggerBuilder.build());
+        } catch (SchedulerException e) {
+            LOG.error("Could not schedule, aborting", e);
+        }
+    }
+
+    @Autowired
+    private NotificationManager notificationManager;
+
+    @Autowired
+    private AuditManager auditManager;
+
+    @Override
+    public void execute(final JobExecutionContext context) throws JobExecutionException {
+        super.execute(context);
+
+        try {
+            AuthContextUtils.execWithAuthContext(context.getMergedJobDataMap().getString(JobManager.DOMAIN_KEY),
+                    new AuthContextUtils.Executable<Void>() {
+
+                @Override
+                public Void exec() {
+                    notificationManager.createTasks(
+                            (AfterHandlingEvent) context.getMergedJobDataMap().get(AfterHandlingEvent.JOBMAP_KEY));
+                    auditManager.audit(
+                            (AfterHandlingEvent) context.getMergedJobDataMap().get(AfterHandlingEvent.JOBMAP_KEY));
+                    return null;
+                }
+            });
+        } catch (RuntimeException e) {
+            throw new JobExecutionException("While handling notification / audit events",
e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/syncope/blob/a90d50ec/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/NotificationJobDelegate.java
----------------------------------------------------------------------
diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/NotificationJobDelegate.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/NotificationJobDelegate.java
index 0d37d86..3ef734b 100644
--- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/NotificationJobDelegate.java
+++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/NotificationJobDelegate.java
@@ -31,13 +31,11 @@ import org.apache.syncope.core.persistence.api.entity.EntityFactory;
 import org.apache.syncope.core.persistence.api.entity.task.NotificationTask;
 import org.apache.syncope.core.persistence.api.entity.task.TaskExec;
 import org.apache.syncope.core.provisioning.api.AuditManager;
-import org.apache.syncope.core.provisioning.api.event.AfterHandlingEvent;
 import org.apache.syncope.core.provisioning.api.notification.NotificationManager;
 import org.quartz.JobExecutionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationEventPublisher;
 import org.springframework.mail.javamail.JavaMailSender;
 import org.springframework.mail.javamail.JavaMailSenderImpl;
 import org.springframework.mail.javamail.MimeMessageHelper;
@@ -49,9 +47,6 @@ public class NotificationJobDelegate {
 
     private static final Logger LOG = LoggerFactory.getLogger(NotificationJobDelegate.class);
 
-    /**
-     * Task DAO.
-     */
     @Autowired
     private TaskDAO taskDAO;
 
@@ -67,9 +62,6 @@ public class NotificationJobDelegate {
     @Autowired
     private NotificationManager notificationManager;
 
-    @Autowired
-    private ApplicationEventPublisher publisher;
-
     private long maxRetries;
 
     private void init() {
@@ -156,9 +148,7 @@ public class NotificationJobDelegate {
                         execution.setMessage(report.toString());
                     }
 
-                    publisher.publishEvent(new AfterHandlingEvent(this,
-                            true,
-                            true,
+                    notificationManager.createTasks(
                             AuditElements.EventCategoryType.TASK,
                             "notification",
                             null,
@@ -167,7 +157,7 @@ public class NotificationJobDelegate {
                             null,
                             null,
                             task,
-                            "Successfully sent notification to " + to));
+                            "Successfully sent notification to " + to);
                 } catch (Exception e) {
                     LOG.error("Could not send e-mail", e);
 
@@ -176,9 +166,7 @@ public class NotificationJobDelegate {
                         execution.setMessage(ExceptionUtils2.getFullStackTrace(e));
                     }
 
-                    publisher.publishEvent(new AfterHandlingEvent(this,
-                            true,
-                            true,
+                    notificationManager.createTasks(
                             AuditElements.EventCategoryType.TASK,
                             "notification",
                             null,
@@ -187,7 +175,7 @@ public class NotificationJobDelegate {
                             null,
                             null,
                             task,
-                            "Could not send notification to " + to, e));
+                            "Could not send notification to " + to, e);
                 }
 
                 execution.setEnd(new Date());

http://git-wip-us.apache.org/repos/asf/syncope/blob/a90d50ec/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/notification/NotificationManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/notification/NotificationManagerImpl.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/notification/NotificationManagerImpl.java
index aecfe69..c438da3 100644
--- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/notification/NotificationManagerImpl.java
+++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/notification/NotificationManagerImpl.java
@@ -81,9 +81,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.support.AbstractBeanDefinition;
-import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
 @Component
@@ -279,8 +277,6 @@ public class NotificationManagerImpl implements NotificationManager {
         });
     }
 
-    @EventListener
-    @Transactional(propagation = Propagation.REQUIRES_NEW)
     @Override
     public void createTasks(final AfterHandlingEvent event) {
         if (event.isNotificationsAvailable()) {

http://git-wip-us.apache.org/repos/asf/syncope/blob/a90d50ec/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/AbstractPushResultHandler.java
----------------------------------------------------------------------
diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/AbstractPushResultHandler.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/AbstractPushResultHandler.java
index a17c789..bbc6eb0 100644
--- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/AbstractPushResultHandler.java
+++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/AbstractPushResultHandler.java
@@ -20,7 +20,9 @@ package org.apache.syncope.core.provisioning.java.pushpull;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.IteratorUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -50,13 +52,14 @@ import org.apache.syncope.core.provisioning.api.event.AfterHandlingEvent;
 import org.apache.syncope.core.provisioning.api.pushpull.IgnoreProvisionException;
 import org.apache.syncope.core.provisioning.api.pushpull.SyncopePushResultHandler;
 import org.apache.syncope.core.provisioning.api.utils.EntityUtils;
+import org.apache.syncope.core.provisioning.java.job.AfterHandlingJob;
 import org.apache.syncope.core.provisioning.java.utils.MappingUtils;
 import org.identityconnectors.framework.common.objects.ConnectorObject;
 import org.identityconnectors.framework.common.objects.ObjectClass;
 import org.identityconnectors.framework.common.objects.Uid;
 import org.quartz.JobExecutionException;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.scheduling.quartz.SchedulerFactoryBean;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
@@ -67,7 +70,7 @@ public abstract class AbstractPushResultHandler extends AbstractSyncopeResultHan
     protected MappingManager mappingManager;
 
     @Autowired
-    private ApplicationEventPublisher publisher;
+    protected SchedulerFactoryBean scheduler;
 
     protected abstract String getName(Any<?> any);
 
@@ -362,7 +365,8 @@ public abstract class AbstractPushResultHandler extends AbstractSyncopeResultHan
 
                 throw new JobExecutionException(e);
             } finally {
-                publisher.publishEvent(new AfterHandlingEvent(this,
+                Map<String, Object> jobMap = new HashMap<>();
+                jobMap.put(AfterHandlingEvent.JOBMAP_KEY, new AfterHandlingEvent(
                         true,
                         true,
                         AuditElements.EventCategoryType.PUSH,
@@ -373,6 +377,7 @@ public abstract class AbstractPushResultHandler extends AbstractSyncopeResultHan
                         beforeObj,
                         output,
                         any));
+                AfterHandlingJob.schedule(scheduler, jobMap);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/syncope/blob/a90d50ec/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/RealmPushResultHandlerImpl.java
----------------------------------------------------------------------
diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/RealmPushResultHandlerImpl.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/RealmPushResultHandlerImpl.java
index 4d7ad3c..20371b5 100644
--- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/RealmPushResultHandlerImpl.java
+++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/RealmPushResultHandlerImpl.java
@@ -19,7 +19,9 @@
 package org.apache.syncope.core.provisioning.java.pushpull;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.syncope.common.lib.to.RealmTO;
 import org.apache.syncope.common.lib.types.AuditElements;
@@ -39,6 +41,7 @@ import org.apache.syncope.core.provisioning.api.pushpull.IgnoreProvisionExceptio
 import org.apache.syncope.core.provisioning.api.pushpull.ProvisioningReport;
 import org.apache.syncope.core.provisioning.api.pushpull.PushActions;
 import org.apache.syncope.core.provisioning.api.pushpull.SyncopePushResultHandler;
+import org.apache.syncope.core.provisioning.java.job.AfterHandlingJob;
 import org.apache.syncope.core.provisioning.java.utils.MappingUtils;
 import org.identityconnectors.framework.common.objects.AttributeBuilder;
 import org.identityconnectors.framework.common.objects.ConnectorObject;
@@ -46,7 +49,7 @@ import org.identityconnectors.framework.common.objects.ResultsHandler;
 import org.identityconnectors.framework.common.objects.filter.EqualsFilter;
 import org.quartz.JobExecutionException;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.scheduling.quartz.SchedulerFactoryBean;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
@@ -55,7 +58,7 @@ public class RealmPushResultHandlerImpl
         implements SyncopePushResultHandler {
 
     @Autowired
-    private ApplicationEventPublisher publisher;
+    protected SchedulerFactoryBean scheduler;
 
     @Transactional(propagation = Propagation.REQUIRES_NEW)
     @Override
@@ -319,7 +322,8 @@ public class RealmPushResultHandlerImpl
 
                 throw new JobExecutionException(e);
             } finally {
-                publisher.publishEvent(new AfterHandlingEvent(this,
+                Map<String, Object> jobMap = new HashMap<>();
+                jobMap.put(AfterHandlingEvent.JOBMAP_KEY, new AfterHandlingEvent(
                         true,
                         true,
                         AuditElements.EventCategoryType.PUSH,
@@ -330,6 +334,7 @@ public class RealmPushResultHandlerImpl
                         beforeObj,
                         output,
                         realm));
+                AfterHandlingJob.schedule(scheduler, jobMap);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/syncope/blob/a90d50ec/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SchedulingPullActions.java
----------------------------------------------------------------------
diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SchedulingPullActions.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SchedulingPullActions.java
index e8de10f..841b226 100644
--- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SchedulingPullActions.java
+++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SchedulingPullActions.java
@@ -54,7 +54,6 @@ public abstract class SchedulingPullActions extends DefaultPullActions {
         String jobName = getClass().getName() + UUID.randomUUID();
 
         jobMap.put(JobManager.DOMAIN_KEY, AuthContextUtils.getDomain());
-        jobMap.put(JobManager.INTERRUPT_MAX_RETRIES_KEY, 1L);
 
         ApplicationContextProvider.getBeanFactory().registerSingleton(jobName, jobInstance);
 

http://git-wip-us.apache.org/repos/asf/syncope/blob/a90d50ec/fit/core-reference/src/test/java/org/apache/syncope/fit/core/AbstractTaskITCase.java
----------------------------------------------------------------------
diff --git a/fit/core-reference/src/test/java/org/apache/syncope/fit/core/AbstractTaskITCase.java
b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/AbstractTaskITCase.java
index 1674621..6d7c897 100644
--- a/fit/core-reference/src/test/java/org/apache/syncope/fit/core/AbstractTaskITCase.java
+++ b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/AbstractTaskITCase.java
@@ -19,7 +19,6 @@
 package org.apache.syncope.fit.core;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
@@ -31,8 +30,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.collections4.IterableUtils;
-import org.apache.commons.collections4.Predicate;
 import org.apache.syncope.common.lib.to.AbstractTaskTO;
 import org.apache.syncope.common.lib.to.ExecTO;
 import org.apache.syncope.common.lib.to.NotificationTaskTO;
@@ -160,18 +157,29 @@ public abstract class AbstractTaskITCase extends AbstractITCase {
         service.shutdownNow();
     }
 
-    protected NotificationTaskTO findNotificationTaskBySender(final String sender) {
-        PagedResult<NotificationTaskTO> tasks =
-                taskService.list(new TaskQuery.Builder(TaskType.NOTIFICATION).page(1).size(100).build());
-        assertNotNull(tasks);
-        assertFalse(tasks.getResult().isEmpty());
+    protected NotificationTaskTO findNotificationTask(final String notification, final int
maxWaitSeconds) {
+        int i = 0;
+        int maxit = maxWaitSeconds;
 
-        return IterableUtils.find(tasks.getResult(), new Predicate<NotificationTaskTO>()
{
+        NotificationTaskTO notificationTask = null;
+        do {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
 
-            @Override
-            public boolean evaluate(final NotificationTaskTO task) {
-                return sender.equals(task.getSender());
+            PagedResult<NotificationTaskTO> tasks =
+                    taskService.list(new TaskQuery.Builder(TaskType.NOTIFICATION).notification(notification).build());
+            if (!tasks.getResult().isEmpty()) {
+                notificationTask = tasks.getResult().get(0);
             }
-        });
+
+            i++;
+        } while (notificationTask == null && i < maxit);
+        if (notificationTask == null) {
+            fail("Timeout when looking for notification tasks from notification " + notification);
+        }
+
+        return notificationTask;
     }
 }

http://git-wip-us.apache.org/repos/asf/syncope/blob/a90d50ec/fit/core-reference/src/test/java/org/apache/syncope/fit/core/NotificationTaskITCase.java
----------------------------------------------------------------------
diff --git a/fit/core-reference/src/test/java/org/apache/syncope/fit/core/NotificationTaskITCase.java
b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/NotificationTaskITCase.java
index f4cd5d7..e7fc955 100644
--- a/fit/core-reference/src/test/java/org/apache/syncope/fit/core/NotificationTaskITCase.java
+++ b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/NotificationTaskITCase.java
@@ -21,7 +21,6 @@ package org.apache.syncope.fit.core;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import com.icegreen.greenmail.util.GreenMail;
@@ -37,6 +36,7 @@ import javax.ws.rs.core.Response;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.syncope.client.lib.SyncopeClient;
 import org.apache.syncope.common.lib.to.AttrTO;
 import org.apache.syncope.common.lib.to.GroupTO;
@@ -44,6 +44,7 @@ import org.apache.syncope.common.lib.to.MembershipTO;
 import org.apache.syncope.common.lib.to.NotificationTO;
 import org.apache.syncope.common.lib.to.NotificationTaskTO;
 import org.apache.syncope.common.lib.to.ExecTO;
+import org.apache.syncope.common.lib.to.PagedResult;
 import org.apache.syncope.common.lib.to.UserTO;
 import org.apache.syncope.common.lib.types.AnyTypeKind;
 import org.apache.syncope.common.lib.types.TaskType;
@@ -134,15 +135,15 @@ public class NotificationTaskITCase extends AbstractTaskITCase {
     public void notifyByMail() throws Exception {
         String sender = "syncopetest-" + getUUIDString() + "@syncope.apache.org";
         String subject = "Test notification " + getUUIDString();
-        String recipient = createNotificationTask(true, true, TraceLevel.ALL, sender, subject);
-        NotificationTaskTO taskTO = findNotificationTaskBySender(sender);
+        Pair<String, String> created = createNotificationTask(true, true, TraceLevel.ALL,
sender, subject);
+        NotificationTaskTO taskTO = findNotificationTask(created.getLeft(), 50);
         assertNotNull(taskTO);
         assertNotNull(taskTO.getNotification());
         assertTrue(taskTO.getExecutions().isEmpty());
 
         execNotificationTask(taskService, taskTO.getKey(), 50);
 
-        assertTrue(verifyMail(sender, subject, recipient));
+        assertTrue(verifyMail(sender, subject, created.getRight()));
 
         // verify message body
         taskTO = taskService.read(taskTO.getKey(), true);
@@ -150,24 +151,24 @@ public class NotificationTaskITCase extends AbstractTaskITCase {
         assertTrue(taskTO.isExecuted());
         assertNotNull(taskTO.getTextBody());
         assertTrue("Notification mail text doesn't contain expected content.",
-                taskTO.getTextBody().contains("Your email address is " + recipient + "."));
+                taskTO.getTextBody().contains("Your email address is " + created.getRight()
+ "."));
         assertTrue("Notification mail text doesn't contain expected content.",
                 taskTO.getTextBody().contains("Your email address inside a link: "
-                        + "http://localhost/?email=" + recipient.replaceAll("@", "%40")));
+                        + "http://localhost/?email=" + created.getRight().replaceAll("@",
"%40")));
     }
 
     @Test
     public void notifyByMailEmptyAbout() throws Exception {
         String sender = "syncopetest-" + getUUIDString() + "@syncope.apache.org";
         String subject = "Test notification " + getUUIDString();
-        String recipient = createNotificationTask(true, false, TraceLevel.ALL, sender, subject);
-        NotificationTaskTO taskTO = findNotificationTaskBySender(sender);
+        Pair<String, String> created = createNotificationTask(true, false, TraceLevel.ALL,
sender, subject);
+        NotificationTaskTO taskTO = findNotificationTask(created.getLeft(), 50);
         assertNotNull(taskTO);
         assertTrue(taskTO.getExecutions().isEmpty());
 
         execNotificationTask(taskService, taskTO.getKey(), 50);
 
-        assertTrue(verifyMail(sender, subject, recipient));
+        assertTrue(verifyMail(sender, subject, created.getRight()));
     }
 
     @Test
@@ -184,8 +185,8 @@ public class NotificationTaskITCase extends AbstractTaskITCase {
             // 3. create notification and user
             String sender = "syncopetest-" + getUUIDString() + "@syncope.apache.org";
             String subject = "Test notification " + getUUIDString();
-            createNotificationTask(true, true, TraceLevel.ALL, sender, subject);
-            NotificationTaskTO taskTO = findNotificationTaskBySender(sender);
+            Pair<String, String> created = createNotificationTask(true, true, TraceLevel.ALL,
sender, subject);
+            NotificationTaskTO taskTO = findNotificationTask(created.getLeft(), 50);
             assertNotNull(taskTO);
             assertNotNull(taskTO.getNotification());
             assertTrue(taskTO.getExecutions().isEmpty());
@@ -211,8 +212,8 @@ public class NotificationTaskITCase extends AbstractTaskITCase {
     @Test
     public void issueSYNCOPE81() {
         String sender = "syncope81@syncope.apache.org";
-        createNotificationTask(true, true, TraceLevel.ALL, sender, "Test notification");
-        NotificationTaskTO taskTO = findNotificationTaskBySender(sender);
+        Pair<String, String> created = createNotificationTask(true, true, TraceLevel.ALL,
sender, "Test notification");
+        NotificationTaskTO taskTO = findNotificationTask(created.getLeft(), 50);
         assertNotNull(taskTO);
         assertNotNull(taskTO.getNotification());
         assertTrue(taskTO.getExecutions().isEmpty());
@@ -231,10 +232,10 @@ public class NotificationTaskITCase extends AbstractTaskITCase {
     public void issueSYNCOPE86() {
         // 1. create notification task
         String sender = "syncope86@syncope.apache.org";
-        createNotificationTask(true, true, TraceLevel.ALL, sender, "Test notification");
+        Pair<String, String> created = createNotificationTask(true, true, TraceLevel.ALL,
sender, "Test notification");
 
         // 2. get NotificationTaskTO for user just created
-        NotificationTaskTO taskTO = findNotificationTaskBySender(sender);
+        NotificationTaskTO taskTO = findNotificationTask(created.getLeft(), 50);
         assertNotNull(taskTO);
         assertNotNull(taskTO.getNotification());
         assertTrue(taskTO.getExecutions().isEmpty());
@@ -258,8 +259,8 @@ public class NotificationTaskITCase extends AbstractTaskITCase {
     public void issueSYNCOPE192() throws Exception {
         String sender = "syncopetest-" + getUUIDString() + "@syncope.apache.org";
         String subject = "Test notification " + getUUIDString();
-        String recipient = createNotificationTask(true, true, TraceLevel.NONE, sender, subject);
-        NotificationTaskTO taskTO = findNotificationTaskBySender(sender);
+        Pair<String, String> created = createNotificationTask(true, true, TraceLevel.NONE,
sender, subject);
+        NotificationTaskTO taskTO = findNotificationTask(created.getLeft(), 50);
         assertNotNull(taskTO);
         assertNotNull(taskTO.getNotification());
         assertTrue(taskTO.getExecutions().isEmpty());
@@ -271,7 +272,7 @@ public class NotificationTaskITCase extends AbstractTaskITCase {
         } catch (InterruptedException e) {
         }
 
-        assertTrue(verifyMail(sender, subject, recipient));
+        assertTrue(verifyMail(sender, subject, created.getRight()));
 
         // verify that last exec status was updated
         taskTO = taskService.read(taskTO.getKey(), true);
@@ -285,16 +286,16 @@ public class NotificationTaskITCase extends AbstractTaskITCase {
     public void issueSYNCOPE445() throws Exception {
         String sender = "syncopetest-" + getUUIDString() + "@syncope.apache.org";
         String subject = "Test notification " + getUUIDString();
-        String recipient = createNotificationTask(
+        Pair<String, String> created = createNotificationTask(
                 true, true, TraceLevel.ALL, sender, subject, "syncope445@syncope.apache.org");
-        NotificationTaskTO taskTO = findNotificationTaskBySender(sender);
+        NotificationTaskTO taskTO = findNotificationTask(created.getLeft(), 50);
         assertNotNull(taskTO);
         assertNotNull(taskTO.getNotification());
         assertTrue(taskTO.getExecutions().isEmpty());
 
         execNotificationTask(taskService, taskTO.getKey(), 50);
 
-        assertTrue(verifyMail(sender, subject, recipient));
+        assertTrue(verifyMail(sender, subject, created.getRight()));
 
         // verify task
         taskTO = taskService.read(taskTO.getKey(), true);
@@ -341,7 +342,7 @@ public class NotificationTaskITCase extends AbstractTaskITCase {
         assertNotNull(groupTO);
 
         // 3. verify
-        NotificationTaskTO taskTO = findNotificationTaskBySender(sender);
+        NotificationTaskTO taskTO = findNotificationTask(notification.getKey(), 50);
         assertNotNull(taskTO);
         assertNotNull(taskTO.getNotification());
         assertTrue(taskTO.getRecipients().containsAll(
@@ -361,13 +362,16 @@ public class NotificationTaskITCase extends AbstractTaskITCase {
     public void issueSYNCOPE492() throws Exception {
         String sender = "syncopetest-" + getUUIDString() + "@syncope.apache.org";
         String subject = "Test notification " + getUUIDString();
-        createNotificationTask(false, true, TraceLevel.NONE, sender, subject, "syncope445@syncope.apache.org");
+        Pair<String, String> created =
+                createNotificationTask(false, true, TraceLevel.NONE, sender, subject, "syncope445@syncope.apache.org");
 
         // verify that no task was created for disabled notification
-        assertNull(findNotificationTaskBySender(sender));
+        PagedResult<NotificationTaskTO> tasks =
+                taskService.list(new TaskQuery.Builder(TaskType.NOTIFICATION).notification(created.getLeft()).build());
+        assertEquals(0, tasks.getSize());
     }
 
-    private String createNotificationTask(
+    private Pair<String, String> createNotificationTask(
             final boolean active,
             final boolean includeAbout,
             final TraceLevel traceLevel,
@@ -383,7 +387,7 @@ public class NotificationTaskITCase extends AbstractTaskITCase {
         if (includeAbout) {
             notification.getAbouts().put(AnyTypeKind.USER.name(),
                     SyncopeClient.getUserSearchConditionBuilder().
-                    inGroups("bf825fe1-7320-4a54-bd64-143b5c18ab97").query());
+                            inGroups("bf825fe1-7320-4a54-bd64-143b5c18ab97").query());
         }
 
         notification.setRecipientsFIQL(SyncopeClient.getUserSearchConditionBuilder().
@@ -410,7 +414,7 @@ public class NotificationTaskITCase extends AbstractTaskITCase {
 
         userTO = createUser(userTO).getEntity();
         assertNotNull(userTO);
-        return userTO.getUsername();
+        return Pair.of(notification.getKey(), userTO.getUsername());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/syncope/blob/a90d50ec/fit/core-reference/src/test/java/org/apache/syncope/fit/core/PushTaskITCase.java
----------------------------------------------------------------------
diff --git a/fit/core-reference/src/test/java/org/apache/syncope/fit/core/PushTaskITCase.java
b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/PushTaskITCase.java
index 9c9a149..a4cc27d 100644
--- a/fit/core-reference/src/test/java/org/apache/syncope/fit/core/PushTaskITCase.java
+++ b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/PushTaskITCase.java
@@ -426,7 +426,7 @@ public class PushTaskITCase extends AbstractTaskITCase {
 
         execProvisioningTask(taskService, actual.getKey(), 50, false);
 
-        NotificationTaskTO taskTO = findNotificationTaskBySender("syncope648@syncope.apache.org");
+        NotificationTaskTO taskTO = findNotificationTask(notification.getKey(), 50);
         assertNotNull(taskTO);
     }
 }


Mime
View raw message