nifi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ald...@apache.org
Subject nifi git commit: NIFI-3545: Release M FlowFilews once N signals arrive
Date Mon, 06 Mar 2017 14:30:26 GMT
Repository: nifi
Updated Branches:
  refs/heads/master 7f5c0dfb5 -> 000414e7e


NIFI-3545: Release M FlowFilews once N signals arrive

- Support multiplle incoming FlowFiles to Wait processor, up to Wait
  Buffer Count
- Added Releasable FlowFile Count, which controls how many FlowFiles can
  be released when wait condition is met
- Added special meaning to Notify delta Zero(0) to clear a signal
  counter back to zero

  This closes #1554

Signed-off-by: Aldrin Piri <aldrin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/000414e7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/000414e7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/000414e7

Branch: refs/heads/master
Commit: 000414e7eaa4c4f459779e73c331f3021f9a5049
Parents: 7f5c0df
Author: Koji Kawamura <ijokarumawak@apache.org>
Authored: Thu Mar 2 22:54:46 2017 +0900
Committer: Aldrin Piri <aldrin@apache.org>
Committed: Mon Mar 6 09:29:44 2017 -0500

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/Notify.java |   8 +-
 .../apache/nifi/processors/standard/Wait.java   | 268 ++++++++++++++-----
 .../processors/standard/WaitNotifyProtocol.java |  86 +++++-
 .../nifi/processors/standard/TestWait.java      | 246 ++++++++++++++++-
 .../standard/TestWaitNotifyProtocol.java        | 106 +++++++-
 5 files changed, 617 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/000414e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
index 155d131..346f1fb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
@@ -97,7 +97,10 @@ public class Notify extends AbstractProcessor {
                 "be evaluated against a FlowFile in order to determine the signal counter delta. " +
                 "Specify how much the counter should increase. " +
                 "For example, if multiple signal events are processed at upstream flow in batch oriented way, " +
-                "the number of events processed can be notified with this property at once.")
+                "the number of events processed can be notified with this property at once. " +
+                "Zero (0) has a special meaning, it clears target count back to 0, which is especially useful when used with Wait " +
+                Wait.RELEASABLE_FLOWFILE_COUNT.getDisplayName() + " = Zero (0) mode, to provide 'open-close-gate' type of flow control. " +
+                "One (1) can open a corresponding Wait processor, and Zero (0) can negate it as if closing a gate.")
             .required(true)
             .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
             .expressionLanguageSupported(true)
@@ -171,7 +174,8 @@ public class Notify extends AbstractProcessor {
 
         int incrementDelta(final String counterName, final int delta) {
             int current = deltas.containsKey(counterName) ? deltas.get(counterName) : 0;
-            int updated = current + delta;
+            // Zero (0) clears count.
+            int updated = delta == 0 ? 0 : current + delta;
             deltas.put(counterName, updated);
             return updated;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/000414e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index 8b76ce5..fccd443 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -26,6 +26,11 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
@@ -40,11 +45,13 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
 import org.apache.nifi.expression.AttributeExpression.ResultType;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.FlowFileFilter;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
@@ -52,6 +59,10 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
 
+import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
+
 @EventDriven
 @SupportsBatching
 @Tags({"map", "cache", "wait", "hold", "distributed", "signal", "release"})
@@ -63,7 +74,7 @@ import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
         + "The release signal entry is then removed from the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. "
 
         + "If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. "
-        + "This is particularly useful with processors that split a source flow file into multiple fragments, such as SplitText. "
+        + "This is particularly useful with processors that split a source FlowFile into multiple fragments, such as SplitText. "
         + "In order to wait for all fragments to be processed, connect the 'original' relationship to a Wait processor, and the 'splits' relationship to "
         + "a corresponding Notify processor. Configure the Notify and Wait processors to use the '${fragment.identifier}' as the value "
         + "of 'Release Signal Identifier', and specify '${fragment.count}' as the value of 'Target Signal Count' in the Wait processor."
@@ -125,11 +136,36 @@ public class Wait extends AbstractProcessor {
             .expressionLanguageSupported(true)
             .build();
 
+    public static final PropertyDescriptor WAIT_BUFFER_COUNT = new PropertyDescriptor.Builder()
+            .name("wait-buffer-count")
+            .displayName("Wait Buffer Count")
+            .description("Specify the maximum number of incoming FlowFiles that can be buffered to check whether it can move forward. " +
+                    "The more buffer can provide the better performance, as it reduces the number of interactions with cache service " +
+                    "by grouping FlowFiles by signal identifier. " +
+                    "Only a signal identifier can be processed at a processor execution.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("1")
+            .build();
+
+    public static final PropertyDescriptor RELEASABLE_FLOWFILE_COUNT = new PropertyDescriptor.Builder()
+            .name("releasable-flowfile-count")
+            .displayName("Releasable FlowFile Count")
+            .description("A value, or the results of an Attribute Expression Language statement, which will " +
+                    "be evaluated against a FlowFile in order to determine the releasable FlowFile count. " +
+                    "This specifies how many FlowFiles can be released when a target count reaches target signal count. " +
+                    "Zero (0) has a special meaning, any number of FlowFiles can be released as long as signal count matches target.")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .defaultValue("1")
+            .build();
+
     // Selects the FlowFile attribute or expression, whose value is used as cache key
     public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder()
             .name("expiration-duration")
             .displayName("Expiration Duration")
-            .description("Indicates the duration after which waiting flow files will be routed to the 'expired' relationship")
+            .description("Indicates the duration after which waiting FlowFiles will be routed to the 'expired' relationship")
             .required(true)
             .defaultValue("10 min")
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
@@ -145,7 +181,7 @@ public class Wait extends AbstractProcessor {
     public static final PropertyDescriptor ATTRIBUTE_COPY_MODE = new PropertyDescriptor.Builder()
             .name("attribute-copy-mode")
             .displayName("Attribute Copy Mode")
-            .description("Specifies how to handle attributes copied from flow files entering the Notify processor")
+            .description("Specifies how to handle attributes copied from FlowFiles entering the Notify processor")
             .defaultValue(ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue())
             .required(true)
             .allowableValues(ATTRIBUTE_COPY_REPLACE, ATTRIBUTE_COPY_KEEP_ORIGINAL)
@@ -208,6 +244,8 @@ public class Wait extends AbstractProcessor {
         descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
         descriptors.add(TARGET_SIGNAL_COUNT);
         descriptors.add(SIGNAL_COUNTER_NAME);
+        descriptors.add(WAIT_BUFFER_COUNT);
+        descriptors.add(RELEASABLE_FLOWFILE_COUNT);
         descriptors.add(EXPIRATION_DURATION);
         descriptors.add(DISTRIBUTED_CACHE_SERVICE);
         descriptors.add(ATTRIBUTE_COPY_MODE);
@@ -223,21 +261,81 @@ public class Wait extends AbstractProcessor {
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
 
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
         final ComponentLog logger = getLogger();
 
         // Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
-        final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
+        final PropertyValue signalIdProperty = context.getProperty(RELEASE_SIGNAL_IDENTIFIER);
+        final Integer bufferCount = context.getProperty(WAIT_BUFFER_COUNT).asInteger();
+
+        final Map<Relationship, List<FlowFile>> processedFlowFiles = new HashMap<>();
+        final Function<Relationship, List<FlowFile>> getFlowFilesFor = r -> processedFlowFiles.computeIfAbsent(r, k -> new ArrayList<>());
+
+        final AtomicReference<String> targetSignalId = new AtomicReference<>();
+        final AtomicInteger bufferedCount = new AtomicInteger(0);
+        final List<FlowFile> failedFilteringFlowFiles = new ArrayList<>();
+        final Supplier<FlowFileFilter.FlowFileFilterResult> acceptResultSupplier =
+                () -> bufferedCount.incrementAndGet() == bufferCount ? ACCEPT_AND_TERMINATE : ACCEPT_AND_CONTINUE;
+        final List<FlowFile> flowFiles = session.get(f -> {
+
+            final String fSignalId = signalIdProperty.evaluateAttributeExpressions(f).getValue();
+
+            // if the computed value is null, or empty, we transfer the FlowFile to failure relationship
+            if (StringUtils.isBlank(fSignalId)) {
+                // We can't penalize f before getting it from session, so keep it in a temporal list.
+                logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {f});
+                failedFilteringFlowFiles.add(f);
+                return ACCEPT_AND_CONTINUE;
+            }
+
+            final String targetSignalIdStr = targetSignalId.get();
+            if (targetSignalIdStr == null) {
+                // This is the first one.
+                targetSignalId.set(fSignalId);
+                return acceptResultSupplier.get();
+            }
+
+            if (targetSignalIdStr.equals(fSignalId)) {
+                return acceptResultSupplier.get();
+            }
+
+            return REJECT_AND_CONTINUE;
 
-        // if the computed value is null, or empty, we transfer the flow file to failure relationship
-        if (StringUtils.isBlank(signalId)) {
-            logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
+        });
+
+        final String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
+        final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
+        final AtomicReference<Signal> signalRef = new AtomicReference<>();
+
+        final Consumer<FlowFile> transferToFailure = flowFile -> {
             flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_FAILURE);
+            getFlowFilesFor.apply(REL_FAILURE).add(flowFile);
+        };
+
+        final Consumer<Entry<Relationship, List<FlowFile>>> transferFlowFiles = routedFlowFiles -> {
+            Relationship relationship = routedFlowFiles.getKey();
+
+            if (REL_WAIT.equals(relationship)) {
+                final String waitMode = context.getProperty(WAIT_MODE).getValue();
+
+                if (WAIT_MODE_KEEP_IN_UPSTREAM.getValue().equals(waitMode)) {
+                    // Transfer to self.
+                    relationship = Relationship.SELF;
+                }
+            }
+
+            final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream()
+                    .map(f -> copySignalAttributes(session, f, signalRef.get(), replaceOriginalAttributes)).collect(Collectors.toList());
+            session.transfer(flowFilesWithSignalAttributes, relationship);
+        };
+
+        failedFilteringFlowFiles.forEach(f -> {
+            flowFiles.remove(f);
+            transferToFailure.accept(f);
+        });
+
+        if (flowFiles.isEmpty()) {
+            // If there was nothing but failed FlowFiles while filtering, transfer those and end immediately.
+            processedFlowFiles.entrySet().forEach(transferFlowFiles);
             return;
         }
 
@@ -245,95 +343,131 @@ public class Wait extends AbstractProcessor {
         final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
         final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
 
-        String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
-        final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
+        final String signalId = targetSignalId.get();
+        final Signal signal;
 
-        Signal signal = null;
+        // get notifying signal
         try {
-            // get notifying signal
             signal = protocol.getSignal(signalId);
+            signalRef.set(signal);
+        } catch (final IOException e) {
+            throw new ProcessException(String.format("Failed to get signal for %s due to %s", signalId, e), e);
+        }
 
-            // check for expiration
+        String targetCounterName = null;
+        long targetCount = 1;
+        int releasableFlowFileCount = 1;
+
+        final List<FlowFile> candidates = new ArrayList<>();
+
+        for (FlowFile flowFile : flowFiles) {
+            // Set wait start timestamp if it's not set yet
             String waitStartTimestamp = flowFile.getAttribute(WAIT_START_TIMESTAMP);
             if (waitStartTimestamp == null) {
                 waitStartTimestamp = String.valueOf(System.currentTimeMillis());
                 flowFile = session.putAttribute(flowFile, WAIT_START_TIMESTAMP, waitStartTimestamp);
             }
 
-            long lWaitStartTimestamp = 0L;
+            long lWaitStartTimestamp;
             try {
                 lWaitStartTimestamp = Long.parseLong(waitStartTimestamp);
             } catch (NumberFormatException nfe) {
                 logger.error("{} has an invalid value '{}' on FlowFile {}", new Object[] {WAIT_START_TIMESTAMP, waitStartTimestamp, flowFile});
-                flowFile = session.penalize(flowFile);
-
-                flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
-                session.transfer(flowFile, REL_FAILURE);
-                return;
+                transferToFailure.accept(flowFile);
+                continue;
             }
+
+            // check for expiration
             long expirationDuration = context.getProperty(EXPIRATION_DURATION)
                     .asTimePeriod(TimeUnit.MILLISECONDS);
             long now = System.currentTimeMillis();
             if (now > (lWaitStartTimestamp + expirationDuration)) {
                 logger.info("FlowFile {} expired after {}ms", new Object[] {flowFile, (now - lWaitStartTimestamp)});
-                flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
-                session.transfer(flowFile, REL_EXPIRED);
-                return;
+                getFlowFilesFor.apply(REL_EXPIRED).add(flowFile);
+                continue;
             }
 
+            // If there's no signal yet, then we don't have to evaluate target counts. Return immediately.
             if (signal == null) {
-                // If there's no signal yet, then we don't have to evaluate target counts. Return immediately.
                 if (logger.isDebugEnabled()) {
-                    logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {signalId, flowFile});
+                    logger.debug("No release signal found for {} on FlowFile {} yet", new Object[] {signalId, flowFile});
                 }
+                getFlowFilesFor.apply(REL_WAIT).add(flowFile);
+                continue;
+            }
 
-
-                final String waitMode = context.getProperty(WAIT_MODE).getValue();
-                if (WAIT_MODE_TRANSFER_TO_WAIT.getValue().equals(waitMode)) {
-                    session.transfer(flowFile, REL_WAIT);
-                } else if (WAIT_MODE_KEEP_IN_UPSTREAM.getValue().equals(waitMode)) {
-                    // Transfer to self.
-                    session.transfer(flowFile);
-                } else {
-                    throw new ProcessException("Unsupported wait mode " + waitMode + " was specified.");
+            // Fix target counter name and count from current FlowFile, if those are not set yet.
+            if (candidates.isEmpty()) {
+                targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
+                try {
+                    targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue());
+                } catch (final NumberFormatException e) {
+                    transferToFailure.accept(flowFile);
+                    logger.error("Failed to parse targetCount when processing {} due to {}", new Object[] {flowFile, e}, e);
+                    continue;
+                }
+                try {
+                    releasableFlowFileCount = Integer.valueOf(context.getProperty(RELEASABLE_FLOWFILE_COUNT).evaluateAttributeExpressions(flowFile).getValue());
+                } catch (final NumberFormatException e) {
+                    transferToFailure.accept(flowFile);
+                    logger.error("Failed to parse releasableFlowFileCount when processing {} due to {}", new Object[] {flowFile, e}, e);
+                    continue;
                 }
-                return;
             }
 
-            final String targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
-            final Long targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue());
-            final boolean reachedToTargetCount = StringUtils.isBlank(targetCounterName)
-                    ? signal.isTotalCountReached(targetCount)
-                    : signal.isCountReached(targetCounterName, targetCount);
+            // FlowFile is now validated and added to candidates.
+            candidates.add(flowFile);
+        }
 
-            if (!reachedToTargetCount) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Release signal count {} hasn't reached {} for {} on FlowFile {}",
-                            new Object[] {targetCounterName, targetCount, signalId, flowFile});
+        boolean waitCompleted = false;
+        boolean waitProgressed = false;
+        if (signal != null && !candidates.isEmpty()) {
+
+            if (releasableFlowFileCount > 1) {
+                signal.releaseCandidatese(targetCounterName, targetCount, releasableFlowFileCount, candidates,
+                        released -> getFlowFilesFor.apply(REL_SUCCESS).addAll(released),
+                        waiting -> getFlowFilesFor.apply(REL_WAIT).addAll(waiting));
+                waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty();
+
+            } else {
+                // releasableFlowFileCount = 0 or 1
+                boolean reachedTargetCount = StringUtils.isBlank(targetCounterName)
+                        ? signal.isTotalCountReached(targetCount)
+                        : signal.isCountReached(targetCounterName, targetCount);
+
+                if (reachedTargetCount) {
+                    if (releasableFlowFileCount == 0) {
+                        getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates);
+                    } else {
+                        // releasableFlowFileCount = 1
+                        getFlowFilesFor.apply(REL_SUCCESS).add(candidates.remove(0));
+                        getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
+                        // If releasableFlowFileCount == 0, leave signal as it is,
+                        // so that any number of FlowFile can be released as long as target count condition matches.
+                        waitCompleted = true;
+                    }
+                } else {
+                    getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
                 }
-                flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
-                session.transfer(flowFile, REL_WAIT);
-                return;
             }
+        }
 
+        // Transfer FlowFiles.
+        processedFlowFiles.entrySet().forEach(transferFlowFiles);
 
-            flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
-            session.transfer(flowFile, REL_SUCCESS);
-
-            protocol.complete(signalId);
-
-        } catch (final NumberFormatException e) {
-            flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
-            flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_FAILURE);
-            logger.error("Failed to parse targetCount when processing {} due to {}", new Object[] {flowFile, e});
+        // Update signal if needed.
+        try {
+            if (waitCompleted) {
+                protocol.complete(signalId);
+            } else if (waitProgressed) {
+                protocol.replace(signal);
+            }
 
         } catch (final IOException e) {
-            flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
-            flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_FAILURE);
-            logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e});
+            session.rollback();
+            throw new ProcessException(String.format("Unable to communicate with cache while updating %s due to %s", signalId, e), e);
         }
+
     }
 
     private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final boolean replaceOriginal) {
@@ -341,13 +475,13 @@ public class Wait extends AbstractProcessor {
             return flowFile;
         }
 
-        // copy over attributes from release signal flow file, if provided
+        // copy over attributes from release signal FlowFile, if provided
         final Map<String, String> attributesToCopy;
         if (replaceOriginal) {
             attributesToCopy = new HashMap<>(signal.getAttributes());
             attributesToCopy.remove("uuid");
         } else {
-            // if the current flow file does *not* have the cached attribute, copy it
+            // if the current FlowFile does *not* have the cached attribute, copy it
             attributesToCopy = signal.getAttributes().entrySet().stream()
                     .filter(e -> flowFile.getAttribute(e.getKey()) == null)
                     .collect(Collectors.toMap(Entry::getKey, Entry::getValue));

http://git-wip-us.apache.org/repos/asf/nifi/blob/000414e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
index ae5cbbd..1c89108 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
@@ -31,7 +31,9 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 
 /**
  * This class provide a protocol for Wait and Notify processors to work together.
@@ -51,8 +53,16 @@ public class WaitNotifyProtocol {
     private final Deserializer<String> stringDeserializer = input -> new String(input, StandardCharsets.UTF_8);
 
     public static class Signal {
+
+        /*
+         * Getter and Setter methods are needed to (de)serialize JSON even if it's not used from app code.
+         */
+
+        transient private String identifier;
+        transient private long revision = -1;
         private Map<String, Long> counts = new HashMap<>();
         private Map<String, String> attributes = new HashMap<>();
+        private int releasableCount = 0;
 
         public Map<String, Long> getCounts() {
             return counts;
@@ -84,6 +94,54 @@ public class WaitNotifyProtocol {
             return count != null ? count : 0;
         }
 
+        public int getReleasableCount() {
+            return releasableCount;
+        }
+
+        public void setReleasableCount(int releasableCount) {
+            this.releasableCount = releasableCount;
+        }
+
+        /**
+         * <p>Consume accumulated notification signals to let some waiting candidates get released.</p>
+         *
+         * <p>This method updates state of this instance, but does not update cache storage.
+         * Caller of this method is responsible for updating cache storage after processing released and waiting candidates
+         * by calling {@link #replace(Signal)}. Caller should rollback what it processed with these candidates if complete call failed.</p>
+         *
+         * @param _counterName signal counter name to consume from.
+         * @param requiredCountForPass number of required signals to acquire a pass.
+         * @param releasableCandidateCountPerPass number of releasable candidate per pass.
+         * @param candidates candidates waiting for being allowed to pass.
+         * @param released function to process allowed candidates to pass.
+         * @param waiting function to process candidates those should remain in waiting queue.
+         * @param <E> Type of candidate
+         */
+        public <E> void releaseCandidatese(final String _counterName, final long requiredCountForPass,
+                                           final int releasableCandidateCountPerPass, final List<E> candidates,
+                                           final Consumer<List<E>> released, final Consumer<List<E>> waiting) {
+
+            // counterName is mandatory otherwise, we can't decide which counter to convert into pass count.
+            final String counterName = _counterName == null || _counterName.length() == 0 ? DEFAULT_COUNT_NAME : _counterName;
+
+            final int candidateSize = candidates.size();
+            if (releasableCount < candidateSize) {
+                // If current passCount is not enough for the candidate size, then try to get more.
+                // Convert notification signals to pass ticket.
+                final long signalCount = getCount(counterName);
+                releasableCount += (signalCount / requiredCountForPass) * releasableCandidateCountPerPass;
+                final long reducedSignalCount = signalCount % requiredCountForPass;
+                counts.put(counterName, reducedSignalCount);
+            }
+
+            int releaseCount = Math.min(releasableCount, candidateSize);
+
+            released.accept(candidates.subList(0, releaseCount));
+            waiting.accept(candidates.subList(releaseCount, candidateSize));
+
+            releasableCount -= releaseCount;
+        }
+
     }
 
     private final AtomicDistributedMapCacheClient cache;
@@ -95,7 +153,7 @@ public class WaitNotifyProtocol {
     /**
      * Notify a signal to increase a counter.
      * @param signalId a key in the underlying cache engine
-     * @param deltas a map containing counterName and delta entries
+     * @param deltas a map containing counterName and delta entries, 0 has special meaning, clears the counter back to 0
      * @param attributes attributes to save in the cache entry
      * @return A Signal instance, merged with an existing signal if any
      * @throws IOException thrown when it failed interacting with the cache engine
@@ -106,10 +164,9 @@ public class WaitNotifyProtocol {
 
         for (int i = 0; i < MAX_REPLACE_RETRY_COUNT; i++) {
 
-            final CacheEntry<String, String> existingEntry = cache.fetch(signalId, stringSerializer, stringDeserializer);
-
             final Signal existingSignal = getSignal(signalId);
             final Signal signal = existingSignal != null ? existingSignal : new Signal();
+            signal.identifier = signalId;
 
             if (attributes != null) {
                 signal.attributes.putAll(attributes);
@@ -117,15 +174,11 @@ public class WaitNotifyProtocol {
 
             deltas.forEach((counterName, delta) -> {
                 long count = signal.counts.containsKey(counterName) ? signal.counts.get(counterName) : 0;
-                count += delta;
+                count = delta == 0 ? 0 : count + delta;
                 signal.counts.put(counterName, count);
             });
 
-            final String signalJson = objectMapper.writeValueAsString(signal);
-            final long revision = existingEntry != null ? existingEntry.getRevision() : -1;
-
-
-            if (cache.replace(signalId, signalJson, stringSerializer, stringSerializer, revision)) {
+            if (replace(signal)) {
                 return signal;
             }
 
@@ -148,7 +201,7 @@ public class WaitNotifyProtocol {
      * Notify a signal to increase a counter.
      * @param signalId a key in the underlying cache engine
      * @param counterName specify count to update
-     * @param delta delta to update a counter
+     * @param delta delta to update a counter, 0 has special meaning, clears the counter back to 0
      * @param attributes attributes to save in the cache entry
      * @return A Signal instance, merged with an existing signal if any
      * @throws IOException thrown when it failed interacting with the cache engine
@@ -184,12 +237,16 @@ public class WaitNotifyProtocol {
         final String value = entry.getValue();
 
         try {
-            return objectMapper.readValue(value, Signal.class);
+            final Signal signal = objectMapper.readValue(value, Signal.class);
+            signal.identifier = signalId;
+            signal.revision = entry.getRevision();
+            return signal;
         } catch (final JsonParseException jsonE) {
             // Try to read it as FlowFileAttributes for backward compatibility.
             try {
                 final Map<String, String> attributes = new FlowFileAttributesSerializer().deserialize(value.getBytes(StandardCharsets.UTF_8));
                 final Signal signal = new Signal();
+                signal.identifier = signalId;
                 signal.setAttributes(attributes);
                 signal.getCounts().put(DEFAULT_COUNT_NAME, 1L);
                 return signal;
@@ -209,4 +266,11 @@ public class WaitNotifyProtocol {
     public void complete(final String signalId) throws IOException {
         cache.remove(signalId, stringSerializer);
     }
+
+    public boolean replace(final Signal signal) throws IOException {
+
+        final String signalJson = objectMapper.writeValueAsString(signal);
+        return cache.replace(signal.identifier, signalJson, stringSerializer, stringSerializer, signal.revision);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/000414e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
index 0ce0045..eb196c9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
@@ -19,12 +19,22 @@ package org.apache.nifi.processors.standard;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
 
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.standard.TestNotify.MockCacheClient;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
@@ -180,14 +190,15 @@ public class TestWait {
         final Map<String, String> props = new HashMap<>();
         props.put("releaseSignalAttribute", "2");
         runner.enqueue(new byte[]{}, props);
-        runner.run();
-
-        //Expect the processor to receive an IO exception from the cache service and route to failure
-        runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
-        runner.assertTransferCount(Wait.REL_FAILURE, 1);
-        runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total");
-
-        service.setFailOnCalls(false);
+        try {
+            runner.run();
+            fail("Expect the processor to receive an IO exception from the cache service and throws ProcessException.");
+        } catch (final AssertionError e) {
+            assertTrue(e.getCause() instanceof ProcessException);
+            assertTrue(e.getCause().getCause() instanceof IOException);
+        } finally {
+            service.setFailOnCalls(false);
+        }
     }
 
     @Test
@@ -360,7 +371,6 @@ public class TestWait {
         assertNull("The key no longer exist", protocol.getSignal("key"));
     }
 
-
     @Test
     public void testWaitForSpecificCount() throws InitializationException, IOException {
         Map<String, String> cachedAttributes = new HashMap<>();
@@ -455,4 +465,222 @@ public class TestWait {
 
     }
 
+    private class TestIteration {
+        final List<MockFlowFile> released = new ArrayList<>();
+        final List<MockFlowFile> waiting = new ArrayList<>();
+        final List<MockFlowFile> failed = new ArrayList<>();
+
+        final List<String> expectedReleased = new ArrayList<>();
+        final List<String> expectedWaiting = new ArrayList<>();
+        final List<String> expectedFailed = new ArrayList<>();
+
+        void run() {
+            released.clear();
+            waiting.clear();
+            failed.clear();
+
+            runner.run();
+
+            released.addAll(runner.getFlowFilesForRelationship(Wait.REL_SUCCESS));
+            waiting.addAll(runner.getFlowFilesForRelationship(Wait.REL_WAIT));
+            failed.addAll(runner.getFlowFilesForRelationship(Wait.REL_FAILURE));
+
+            assertEquals(expectedReleased.size(), released.size());
+            assertEquals(expectedWaiting.size(), waiting.size());
+            assertEquals(expectedFailed.size(), failed.size());
+
+            final BiConsumer<List<String>, List<MockFlowFile>> assertContents = (expected, actual) -> {
+                for (int i = 0; i < expected.size(); i++) {
+                    actual.get(i).assertContentEquals(expected.get(i));
+                }
+            };
+
+            assertContents.accept(expectedReleased, released);
+            assertContents.accept(expectedWaiting, waiting);
+            assertContents.accept(expectedFailed, failed);
+
+            runner.clearTransferState();
+            expectedReleased.clear();
+            expectedWaiting.clear();
+            expectedFailed.clear();
+        }
+    }
+
+    @Test
+    public void testWaitBufferCount() throws InitializationException, IOException {
+        Map<String, String> cachedAttributes = new HashMap<>();
+        cachedAttributes.put("notified", "notified-value");
+
+        // Setup existing cache entry.
+        final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
+
+        runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
+        runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
+        runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
+        runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2");
+
+        final Map<String, String> waitAttributesA = new HashMap<>();
+        waitAttributesA.put("releaseSignalAttribute", "key-A");
+        waitAttributesA.put("targetSignalCount", "1");
+        waitAttributesA.put("signalCounterName", "counter");
+
+        final Map<String, String> waitAttributesB = new HashMap<>();
+        waitAttributesB.put("releaseSignalAttribute", "key-B");
+        waitAttributesB.put("targetSignalCount", "3");
+        waitAttributesB.put("signalCounterName", "counter");
+
+        final Map<String, String> waitAttributesBInvalid = new HashMap<>();
+        waitAttributesBInvalid.putAll(waitAttributesB);
+        waitAttributesBInvalid.remove("releaseSignalAttribute");
+
+        final TestIteration testIteration = new TestIteration();
+
+        // Enqueue multiple wait FlowFiles.
+        runner.enqueue("1".getBytes(), waitAttributesB); // Should be picked at 1st and 2nd itr
+        runner.enqueue("2".getBytes(), waitAttributesA); // Should be picked at 3rd itr
+        runner.enqueue("3".getBytes(), waitAttributesBInvalid);
+        runner.enqueue("4".getBytes(), waitAttributesA); // Should be picked at 3rd itr
+        runner.enqueue("5".getBytes(), waitAttributesB); // Should be picked at 1st
+        runner.enqueue("6".getBytes(), waitAttributesB); // Should be picked at 2nd itr
+
+        /*
+         * 1st run:
+         * pick 1 key-B
+         * skip 2 cause key-A
+         * skip 3 cause invalid
+         * skip 4 cause key-A
+         * pick 5 key-B
+         */
+        testIteration.expectedWaiting.addAll(Arrays.asList("1", "5")); // Picked, but not enough counter.
+        testIteration.expectedFailed.add("3"); // invalid.
+        testIteration.run();
+
+        /*
+         * 2nd run:
+         * pick 6 key-B
+         * pick 1 key-B
+         */
+        protocol.notify("key-B", "counter", 3, cachedAttributes);
+        testIteration.expectedReleased.add("6");
+        testIteration.expectedWaiting.add("1"); // Picked but only one FlowFile can be released.
+        // enqueue waiting, simulating wait relationship back to self
+        testIteration.waiting.forEach(f -> runner.enqueue(f));
+        testIteration.run();
+
+    }
+
+    @Test
+    public void testReleaseMultipleFlowFiles() throws InitializationException, IOException {
+        Map<String, String> cachedAttributes = new HashMap<>();
+        cachedAttributes.put("notified", "notified-value");
+
+        // Setup existing cache entry.
+        final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
+
+        runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
+        runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
+        runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
+        runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2");
+        runner.setProperty(Wait.RELEASABLE_FLOWFILE_COUNT, "${fragmentCount}");
+
+        final Map<String, String> waitAttributes = new HashMap<>();
+        waitAttributes.put("releaseSignalAttribute", "key");
+        waitAttributes.put("targetSignalCount", "3");
+        waitAttributes.put("signalCounterName", "counter");
+        waitAttributes.put("fragmentCount", "6");
+
+        final TestIteration testIteration = new TestIteration();
+
+        // Enqueue 6 wait FlowFiles. 1,2,3,4,5,6
+        IntStream.range(1, 7).forEach(i -> runner.enqueue(String.valueOf(i).getBytes(), waitAttributes));
+
+        /*
+         * 1st run
+         */
+        testIteration.expectedWaiting.addAll(Arrays.asList("1", "2"));
+        testIteration.run();
+
+        WaitNotifyProtocol.Signal signal = protocol.getSignal("key");
+        assertNull(signal);
+
+        /*
+         * 2nd run
+         */
+        protocol.notify("key", "counter", 3, cachedAttributes);
+        testIteration.expectedReleased.addAll(Arrays.asList("3", "4"));
+        testIteration.waiting.forEach(f -> runner.enqueue(f));
+        testIteration.run();
+
+        signal = protocol.getSignal("key");
+        assertEquals(0, signal.getCount("count"));
+        assertEquals(4, signal.getReleasableCount());
+
+        /*
+         * 3rd run
+         */
+        testIteration.expectedReleased.addAll(Arrays.asList("5", "6"));
+        testIteration.waiting.forEach(f -> runner.enqueue(f));
+        testIteration.run();
+
+        signal = protocol.getSignal("key");
+        assertEquals(0, signal.getCount("count"));
+        assertEquals(2, signal.getReleasableCount());
+    }
+
+    @Test
+    public void testOpenGate() throws InitializationException, IOException {
+        Map<String, String> cachedAttributes = new HashMap<>();
+        cachedAttributes.put("notified", "notified-value");
+
+        // Setup existing cache entry.
+        final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
+
+        runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
+        runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
+        runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
+        runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2");
+        runner.setProperty(Wait.RELEASABLE_FLOWFILE_COUNT, "0"); // Leave gate open
+
+        final Map<String, String> waitAttributes = new HashMap<>();
+        waitAttributes.put("releaseSignalAttribute", "key");
+        waitAttributes.put("targetSignalCount", "3");
+        waitAttributes.put("signalCounterName", "counter");
+
+        final TestIteration testIteration = new TestIteration();
+
+        // Enqueue 6 wait FlowFiles. 1,2,3,4,5,6
+        IntStream.range(1, 7).forEach(i -> runner.enqueue(String.valueOf(i).getBytes(), waitAttributes));
+
+        /*
+         * 1st run
+         */
+        testIteration.expectedWaiting.addAll(Arrays.asList("1", "2"));
+        testIteration.run();
+
+        WaitNotifyProtocol.Signal signal = protocol.getSignal("key");
+        assertNull(signal);
+
+        /*
+         * 2nd run
+         */
+        protocol.notify("key", "counter", 3, cachedAttributes);
+        testIteration.expectedReleased.addAll(Arrays.asList("3", "4"));
+        testIteration.waiting.forEach(f -> runner.enqueue(f));
+        testIteration.run();
+
+        signal = protocol.getSignal("key");
+        assertEquals(3, signal.getCount("counter"));
+        assertEquals(0, signal.getReleasableCount());
+
+        /*
+         * 3rd run
+         */
+        testIteration.expectedReleased.addAll(Arrays.asList("5", "6"));
+        testIteration.waiting.forEach(f -> runner.enqueue(f));
+        testIteration.run();
+
+        signal = protocol.getSignal("key");
+        assertEquals(3, signal.getCount("counter"));
+        assertEquals(0, signal.getReleasableCount());
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/000414e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
index bf7e1e6..13b4346 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
@@ -27,11 +27,18 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.stubbing.Answer;
 
+import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
+import static org.apache.nifi.processors.standard.WaitNotifyProtocol.DEFAULT_COUNT_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -102,7 +109,7 @@ public class TestWaitNotifyProtocol {
         final CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id");
 
         assertEquals(0, cacheEntry.getRevision());
-        assertEquals("{\"counts\":{\"a\":1},\"attributes\":{}}", cacheEntry.getValue());
+        assertEquals("{\"counts\":{\"a\":1},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
     }
 
     @Test
@@ -119,20 +126,20 @@ public class TestWaitNotifyProtocol {
 
         CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id");
         assertEquals(1, cacheEntry.getRevision());
-        assertEquals("{\"counts\":{\"a\":2},\"attributes\":{}}", cacheEntry.getValue());
+        assertEquals("{\"counts\":{\"a\":2},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
 
         protocol.notify(signalId, "a", 10, null);
 
         cacheEntry = cacheEntries.get("signal-id");
         assertEquals(2, cacheEntry.getRevision());
-        assertEquals("{\"counts\":{\"a\":12},\"attributes\":{}}", cacheEntry.getValue());
+        assertEquals("{\"counts\":{\"a\":12},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
 
         protocol.notify(signalId, "b", 2, null);
         protocol.notify(signalId, "c", 3, null);
 
         cacheEntry = cacheEntries.get("signal-id");
         assertEquals(4, cacheEntry.getRevision());
-        assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{}}", cacheEntry.getValue());
+        assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
 
         final Map<String, Integer> deltas = new HashMap<>();
         deltas.put("a", 10);
@@ -141,7 +148,13 @@ public class TestWaitNotifyProtocol {
 
         cacheEntry = cacheEntries.get("signal-id");
         assertEquals(5, cacheEntry.getRevision());
-        assertEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{}}", cacheEntry.getValue());
+        assertEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
+
+        // Zero clear 'b'.
+        protocol.notify("signal-id", "b", 0, null);
+        cacheEntry = cacheEntries.get("signal-id");
+        assertEquals(6, cacheEntry.getRevision());
+        assertEquals("{\"counts\":{\"a\":22,\"b\":0,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
 
     }
 
@@ -161,7 +174,7 @@ public class TestWaitNotifyProtocol {
 
         CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id");
         assertEquals(0, cacheEntry.getRevision());
-        assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"}}", cacheEntry.getValue());
+        assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"},\"releasableCount\":0}", cacheEntry.getValue());
 
         final Map<String, String> attributeA2 = new HashMap<>();
         attributeA2.put("p2", "a2"); // Update p2
@@ -173,7 +186,7 @@ public class TestWaitNotifyProtocol {
         cacheEntry = cacheEntries.get("signal-id");
         assertEquals(1, cacheEntry.getRevision());
         assertEquals("Updated attributes should be merged correctly",
-                "{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"}}", cacheEntry.getValue());
+                "{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"},\"releasableCount\":0}", cacheEntry.getValue());
 
     }
 
@@ -237,7 +250,7 @@ public class TestWaitNotifyProtocol {
         final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
         final Signal signal = protocol.getSignal(signalId);
 
-        assertEquals(1, signal.getCount(WaitNotifyProtocol.DEFAULT_COUNT_NAME));
+        assertEquals(1, signal.getCount(DEFAULT_COUNT_NAME));
         assertEquals("value1", signal.getAttributes().get("key1"));
         assertEquals("value2", signal.getAttributes().get("key2"));
         assertEquals("value3", signal.getAttributes().get("key3"));
@@ -251,4 +264,81 @@ public class TestWaitNotifyProtocol {
 
     }
 
+    @Test
+    public void testReleaseCandidate() throws Exception {
+        final List<Integer> candidates = IntStream.range(0, 10).boxed().collect(Collectors.toList());
+        final Signal signal = new Signal();
+        final List<Integer> released = new ArrayList<>();
+        final List<Integer> waiting = new ArrayList<>();
+
+        // Test default name.
+        final String counterName = null;
+
+        final BiConsumer<Long, Integer> releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> {
+            released.clear();
+            waiting.clear();
+            signal.releaseCandidatese(counterName, requiredCountForPass, releasableCandidatePerPass, candidates,
+                    r -> released.addAll(r), w -> waiting.addAll(w));
+        };
+
+        final Field releasableCount = Signal.class.getDeclaredField("releasableCount");
+        releasableCount.setAccessible(true);
+
+        // No counter, should wait.
+        releaseCandidate.accept(3L, 1);
+        assertEquals(0, released.size());
+        assertEquals(10, waiting.size());
+        assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME));
+        assertEquals(0, releasableCount.getInt(signal));
+
+        // Counter is not enough yet.
+        signal.getCounts().put(DEFAULT_COUNT_NAME, 1L);
+        releaseCandidate.accept(3L, 1);
+        assertEquals(0, released.size());
+        assertEquals(10, waiting.size());
+        assertEquals(1, signal.getCount(DEFAULT_COUNT_NAME)); // Counter incremented, but not enough
+        assertEquals(0, releasableCount.getInt(signal));
+
+        // Counter reached the target.
+        signal.getCounts().put(DEFAULT_COUNT_NAME, 3L);
+        releaseCandidate.accept(3L, 1);
+        assertEquals(1, released.size());
+        assertEquals(9, waiting.size());
+        assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // Counter 3 was converted into 1 release
+        assertEquals(0, releasableCount.getInt(signal));
+
+        // Counter reached the target for two candidates.
+        signal.getCounts().put(DEFAULT_COUNT_NAME, 6L);
+        releaseCandidate.accept(3L, 1);
+        assertEquals(2, released.size());
+        assertEquals(8, waiting.size());
+        assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // Counter 3 was converted into 1 release
+        assertEquals(0, releasableCount.getInt(signal));
+
+        // Counter reached the target for two candidates, and reminder is 2.
+        signal.getCounts().put(DEFAULT_COUNT_NAME, 11L);
+        releaseCandidate.accept(3L, 1);
+        assertEquals(3, released.size()); // 11 / 3 = 3
+        assertEquals(7, waiting.size());
+        assertEquals(2, signal.getCount(DEFAULT_COUNT_NAME)); // 11 % 3 = 2
+        assertEquals(0, releasableCount.getInt(signal));
+
+        // Counter reached the target for two pass count and each pass can release 2 candidates.
+        signal.getCounts().put(DEFAULT_COUNT_NAME, 6L);
+        releaseCandidate.accept(3L, 2);
+        assertEquals(4, released.size()); // (6 / 3) * 2 = 4
+        assertEquals(6, waiting.size());
+        assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // 6 % 3 = 0
+        assertEquals(0, releasableCount.getInt(signal));
+
+        // If there are counts more than enough to release current candidates, unused releasableCount should remain.
+        signal.getCounts().put(DEFAULT_COUNT_NAME, 50L);
+        releaseCandidate.accept(3L, 2);
+        assertEquals(10, released.size()); // (50 / 3) * 2 = 32. Used 10.
+        assertEquals(0, waiting.size());
+        assertEquals(2, signal.getCount(DEFAULT_COUNT_NAME)); // 50 % 3 = 2.
+        assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22.
+
+    }
+
 }
\ No newline at end of file


Mime
View raw message