brooklyn-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aledsage <...@git.apache.org>
Subject [GitHub] incubator-brooklyn pull request: Brooklyn node and cluster upgrade
Date Thu, 30 Oct 2014 12:06:06 GMT
Github user aledsage commented on a diff in the pull request:

    https://github.com/apache/incubator-brooklyn/pull/272#discussion_r19600483
  
    --- Diff: core/src/main/java/brooklyn/event/basic/DependentConfiguration.java ---
    @@ -209,31 +261,101 @@ private DependentConfiguration() {}
                 if (abortionExceptions.size() > 0) {
                     throw new CompoundRuntimeException("Aborted waiting for ready from "+source+"
"+sensor, abortionExceptions);
                 }
    +
    +            TaskInternal<?> current = (TaskInternal<?>) Tasks.current();
    +            if (current == null) throw new IllegalStateException("Should only be invoked
in a running task");
    +            Entity entity = BrooklynTaskTags.getTargetOrContextEntity(current);
    +            if (entity == null) throw new IllegalStateException("Should only be invoked
in a running task with an entity tag; "+
    +                current+" has no entity tag ("+current.getStatusDetail(false)+")");
    +            
    +            final LinkedList<T> publishedValues = new LinkedList<T>();
    +            final Semaphore semaphore = new Semaphore(0); // could use Exchanger
    +            SubscriptionHandle subscription = null;
    +            List<SubscriptionHandle> abortSubscriptions = Lists.newArrayList();
                 
    -            value = source.getAttribute(sensor);
    -            while (!ready.apply(value)) {
    -                String prevBlockingDetails = current.setBlockingDetails(blockingDetails);
    -                try {
    -                    semaphore.acquire();
    -                } finally {
    -                    current.setBlockingDetails(prevBlockingDetails);
    +            try {
    +                subscription = ((EntityInternal)entity).getSubscriptionContext().subscribe(source,
sensor, new SensorEventListener<T>() {
    +                    @Override public void onEvent(SensorEvent<T> event) {
    +                        synchronized (publishedValues) { publishedValues.add(event.getValue());
}
    +                        semaphore.release();
    +                    }});
    +                for (final AttributeAndSensorCondition abortCondition : abortSensorConditions)
{
    +                    abortSubscriptions.add(((EntityInternal)entity).getSubscriptionContext().subscribe(abortCondition.source,
abortCondition.sensor, new SensorEventListener<Object>() {
    +                        @Override public void onEvent(SensorEvent<Object> event)
{
    +                            if (abortCondition.predicate.apply(event.getValue())) {
    +                                abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+"
-> "+abortCondition.sensor));
    +                                semaphore.release();
    +                            }
    +                        }}));
    +                    Object abortValue = abortCondition.source.getAttribute(abortCondition.sensor);
    +                    if (abortCondition.predicate.apply(abortValue)) {
    +                        abortionExceptions.add(new Exception("Abort due to "+abortCondition.source+"
-> "+abortCondition.sensor));
    +                    }
                     }
    -                
                     if (abortionExceptions.size() > 0) {
                         throw new CompoundRuntimeException("Aborted waiting for ready from
"+source+" "+sensor, abortionExceptions);
                     }
    -                value = data.get();
    -            }
    -            if (LOG.isDebugEnabled()) LOG.debug("Attribute-ready for {} in entity {}",
sensor, source);
    -            return value;
    -        } catch (InterruptedException e) {
    -            throw Exceptions.propagate(e);
    -        } finally {
    -            if (subscription != null) {
    -                ((EntityInternal)entity).getSubscriptionContext().unsubscribe(subscription);
    -            }
    -            for (SubscriptionHandle handle : abortSubscriptions) {
    -                ((EntityInternal)entity).getSubscriptionContext().unsubscribe(handle);
    +
    +                CountdownTimer timer = timeout!=null ? timeout.countdownTimer() : null;
    +                Duration maxPeriod = Duration.millis(200);
    +                Duration nextPeriod = Duration.millis(10);
    +                while (true) {
    +                    // check the source on initial run (could be done outside the loop)

    +                    // and also (optionally) on each iteration in case it is more recent

    +                    value = source.getAttribute(sensor);
    +                    if (ready(value)) break;
    +
    +                    if (timer!=null) {
    +                        if (timer.getDurationRemaining().isShorterThan(nextPeriod)) {
    +                            nextPeriod = timer.getDurationRemaining();
    +                        }
    +                        if (timer.isExpired()) {
    +                            if (onTimeout.isPresent()) return onTimeout.get();
    +                            throw new TimeoutException("Unsatisfied after "+Duration.sinceUtc(start));
    +                        }
    +                    }
    +
    +                    String prevBlockingDetails = current.setBlockingDetails(blockingDetails);
    +                    try {
    +                        if (semaphore.tryAcquire(nextPeriod.toMilliseconds(), TimeUnit.MILLISECONDS))
{
    +                            // immediately release so we are available for the next check
    +                            semaphore.release();
    +                            // if other permits have been made available (e.g. multiple
notifications) drain them all as no point running multiple times
    +                            semaphore.drainPermits();
    +                        }
    +                    } finally {
    +                        current.setBlockingDetails(prevBlockingDetails);
    +                    }
    +
    +                    // check any subscribed values which have come in first
    +                    while (!publishedValues.isEmpty()) {
    --- End diff --
    
    Should probably access `publishedValues.isEmpty` while synchronized.
    But you'll probably get away without, given it is piggie-backing on the "happens-before"
of the `semaphore.tryAcquire`, so no strong feelings.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message