brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [11/35] brooklyn-server git commit: add and use builder for ScheduledTask
Date Tue, 03 Oct 2017 14:23:48 GMT
add and use builder for ScheduledTask


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/18a908bd
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/18a908bd
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/18a908bd

Branch: refs/heads/master
Commit: 18a908bd42f5a09da838c2f90bd6f79b5cd14766
Parents: 934cf4c
Author: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Authored: Tue Sep 12 17:31:42 2017 +0100
Committer: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Committed: Fri Sep 15 10:29:09 2017 +0100

----------------------------------------------------------------------
 .../org/apache/brooklyn/core/feed/Poller.java   | 41 +++++++++-----------
 .../mgmt/ha/HighAvailabilityManagerImpl.java    |  2 +-
 .../rebind/PeriodicDeltaChangeListener.java     |  4 +-
 .../core/mgmt/rebind/RebindManagerImpl.java     |  2 +-
 .../location/ssh/SshMachineLocation.java        |  6 +--
 .../brooklyn/util/core/task/ScheduledTask.java  | 41 ++++++++++++++++++++
 .../util/core/task/ScheduledExecutionTest.java  | 19 +++++----
 .../policy/ha/AbstractFailureDetector.java      |  3 +-
 8 files changed, 76 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/18a908bd/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
index 1a29c48..03fd1df 100644
--- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
+++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java
@@ -143,29 +143,26 @@ public class Poller<V> {
         for (final PollJob<V> pollJob : pollJobs) {
             final String scheduleName = pollJob.handler.getDescription();
             if (pollJob.pollPeriod.compareTo(Duration.ZERO) > 0) {
-                Callable<Task<?>> pollingTaskFactory = new Callable<Task<?>>()
{
-                    @Override
-                    public Task<?> call() {
-                        DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName",
scheduleName, "entity", entity), 
-                            new Callable<Void>() { @Override public Void call() {
-                                if (!Entities.isManaged(entity)) {
-                                    return null;
-                                }
-                                if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP)))
{
-                                    return null;
-                                }
-                                pollJob.wrappedJob.run();
-                                return null; 
-                            } } );
-                        BrooklynTaskTags.setTransient(task);
-                        return task;
-                    }
-                };
-                Map<String, ?> taskFlags = MutableMap.of("displayName", "scheduled:"
+ scheduleName);
-                ScheduledTask task = new ScheduledTask(taskFlags, pollingTaskFactory)
+                ScheduledTask t = ScheduledTask.builder(() -> {
+                            DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName",
scheduleName, "entity", entity), 
+                                new Callable<Void>() { @Override public Void call()
{
+                                    if (!Entities.isManaged(entity)) {
+                                        return null;
+                                    }
+                                    if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP)))
{
+                                        return null;
+                                    }
+                                    pollJob.wrappedJob.run();
+                                    return null; 
+                                } } );
+                            BrooklynTaskTags.setTransient(task);
+                            return task;
+                        })
+                        .displayName("scheduled:" + scheduleName)
                         .period(pollJob.pollPeriod)
-                        .cancelOnException(false);
-                tasks.add(Entities.submit(entity, task));
+                        .cancelOnException(false)
+                        .build();
+                tasks.add(Entities.submit(entity, t));
             } else {
                 if (log.isDebugEnabled()) log.debug("Activating poll (but leaving off, as
period {}) for {} (using {})", new Object[] {pollJob.pollPeriod, entity, this});
             }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/18a908bd/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
index f6dc1a4..c161533 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/ha/HighAvailabilityManagerImpl.java
@@ -589,7 +589,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager
{
         } else {
             if (pollingTask!=null) pollingTask.cancel(true);
             
-            ScheduledTask task = new ScheduledTask(MutableMap.of("period", pollPeriod, "displayName",
"scheduled:[HA poller task]", "tag", BrooklynTaskTags.TRANSIENT_TASK_TAG), taskFactory);
+            ScheduledTask task = ScheduledTask.builder(taskFactory).period(pollPeriod).displayName("scheduled:[HA
poller task]").tagTransient().build();
             pollingTask = managementContext.getExecutionManager().submit(task);
         }
     }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/18a908bd/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
index 62b2069..5f5e50a 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
@@ -244,8 +244,8 @@ public class PeriodicDeltaChangeListener implements ChangeListener {
                         }}).build();
                 }
             };
-            scheduledTask = (ScheduledTask) executionContext.submit(new ScheduledTask(MutableMap.of("displayName",
"scheduled[periodic-persister]",
-                "tags", MutableSet.of(BrooklynTaskTags.TRANSIENT_TASK_TAG)), taskFactory).period(period).delay(period));
+            scheduledTask = (ScheduledTask) executionContext.submit(
+                ScheduledTask.builder(taskFactory).displayName("scheduled:[periodic-persister]").tagTransient().period(period).delay(period).build()
);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/18a908bd/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
index d605753..d896376 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
@@ -355,7 +355,7 @@ public class RebindManagerImpl implements RebindManager {
             }
         };
         readOnlyTask = (ScheduledTask) managementContext.getServerExecutionContext().submit(
-            new ScheduledTask(MutableMap.of("displayName", "Periodic read-only rebind"),
taskFactory).period(periodicPersistPeriod));
+            ScheduledTask.builder(taskFactory).displayName("scheduled:[periodic-read-only-rebind]").period(periodicPersistPeriod).build()
);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/18a908bd/core/src/main/java/org/apache/brooklyn/location/ssh/SshMachineLocation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/location/ssh/SshMachineLocation.java b/core/src/main/java/org/apache/brooklyn/location/ssh/SshMachineLocation.java
index 167d83e..7033398 100644
--- a/core/src/main/java/org/apache/brooklyn/location/ssh/SshMachineLocation.java
+++ b/core/src/main/java/org/apache/brooklyn/location/ssh/SshMachineLocation.java
@@ -491,10 +491,8 @@ public class SshMachineLocation extends AbstractLocation implements MachineLocat
         };
         
         Duration expiryDuration = getConfig(SSH_CACHE_EXPIRY_DURATION);
-        cleanupTask = getManagementContext().getExecutionManager().submit(new ScheduledTask(
-            MutableMap.of("displayName", "scheduled[ssh-location cache cleaner]"), cleanupTaskFactory)
-                .period(expiryDuration)
-                .delay(expiryDuration));
+        cleanupTask = getManagementContext().getExecutionManager().submit(
+            ScheduledTask.builder(cleanupTaskFactory).displayName("scheduled:[ssh-location
cache cleaner]").period(expiryDuration).delay(expiryDuration).build() );
     }
     
     // TODO close has been used for a long time to perform clean-up wanted on unmanagement,
but that's not clear; 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/18a908bd/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
index 89eb6ab..745d49c 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ScheduledTask.java
@@ -21,6 +21,7 @@ package org.apache.brooklyn.util.core.task;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.elvis;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
@@ -29,6 +30,8 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.time.Duration;
@@ -118,6 +121,44 @@ public class ScheduledTask extends BasicTask<Object> {
         cancelOnException = cancelFlag == null || Boolean.TRUE.equals(cancelFlag);
     }
     
+    public static Builder builder(Callable<Task<?>> val) {
+        return new Builder(val);
+    }
+    
+    public static class Builder {
+        Callable<Task<?>> factory;
+
+        String displayName;
+        List<Object> tags = MutableList.of();
+        Duration delay, period;
+        Integer maxInterations;
+        boolean cancelOnException = true;
+        Map<String,Object> flags = MutableMap.of();
+        
+        public Builder(Callable<Task<?>> val) { this.factory = val; }
+        
+        public ScheduledTask build() {
+            return new ScheduledTask(MutableMap.copyOf(flags)
+                    .addIfNotNull("displayName", displayName) 
+                    .addIfNotNull("tags", tags.isEmpty() ? null : tags)
+                    .addIfNotNull("delay", delay) 
+                    .addIfNotNull("period", period) 
+                    .addIfNotNull("maxIterations", maxInterations) 
+                    .addIfNotNull("cancelOnException", cancelOnException) 
+                , factory);
+        }
+        
+        public Builder displayName(String val) { this.displayName = val; return this; }
+        public Builder tag(Object val) { this.tags.add(val); return this; }
+        public Builder tagTransient() { return tag(BrooklynTaskTags.TRANSIENT_TASK_TAG);
}
+        public Builder delay(Duration val) { this.delay = val; return this; }
+        public Builder period(Duration val) { this.period = val; return this; }
+        public Builder maxIterations(Integer val) { this.maxInterations = val; return this;
}
+        public Builder cancelOnException(boolean val) { this.cancelOnException = val; return
this; }
+        public Builder addFlags(Map<String,?> val) { this.flags.putAll(val); return
this; }
+
+    }
+    
     public ScheduledTask delay(Duration d) {
         this.delay = d;
         return this;

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/18a908bd/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
b/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
index 536dfea..166da59 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ScheduledExecutionTest.java
@@ -52,19 +52,18 @@ public class ScheduledExecutionTest {
     
     @Test
     public void testScheduledTask() throws Exception {
-        int PERIOD = 20;
+        Duration PERIOD = Duration.millis(20);
         BasicExecutionManager m = new BasicExecutionManager("mycontextid");
         final AtomicInteger i = new AtomicInteger(0);
-        ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2*PERIOD, "period", PERIOD,
"maxIterations", 5), new Callable<Task<?>>() {
-            @Override
-            public Task<?> call() throws Exception {
-                return new BasicTask<Integer>(new Callable<Integer>() {
-                    @Override
-                    public Integer call() {
-                        log.debug("task running: "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
+        ScheduledTask t = ScheduledTask.builder(() -> new BasicTask<Integer>(()
-> {
+                        log.info("task running: "+Tasks.current()+" "+Tasks.current().getStatusDetail(false));
                         return i.incrementAndGet();
-                    }});
-            }});
+                    }))
+                .displayName("test-1")
+                .delay(PERIOD.multiply(2))
+                .period(PERIOD)
+                .maxIterations(5)
+                .build();
     
         log.info("submitting {} {}", t, t.getStatusDetail(false));
         m.submit(t);

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/18a908bd/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
b/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
index d625a4f..840335a 100644
--- a/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
@@ -191,10 +191,9 @@ public abstract class AbstractFailureDetector extends AbstractPolicy
{
         doStartPolling();
     }
 
-    @SuppressWarnings("unchecked")
     protected void doStartPolling() {
         if (scheduledTask == null || scheduledTask.isDone()) {
-            ScheduledTask task = new ScheduledTask(MutableMap.of("period", getPollPeriod(),
"displayName", getTaskName()), pollingTaskFactory);
+            ScheduledTask task = ScheduledTask.builder(pollingTaskFactory).displayName( getTaskName()
).period(getPollPeriod()).build();
             scheduledTask = ((EntityInternal)entity).getExecutionContext().submit(task);
         }
     }


Mime
View raw message