Repository: nifi
Updated Branches:
refs/heads/master 7f5c0dfb5 -> 000414e7e
NIFI-3545: Release M FlowFilews once N signals arrive
- Support multiplle incoming FlowFiles to Wait processor, up to Wait
Buffer Count
- Added Releasable FlowFile Count, which controls how many FlowFiles can
be released when wait condition is met
- Added special meaning to Notify delta Zero(0) to clear a signal
counter back to zero
This closes #1554
Signed-off-by: Aldrin Piri <aldrin@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/000414e7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/000414e7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/000414e7
Branch: refs/heads/master
Commit: 000414e7eaa4c4f459779e73c331f3021f9a5049
Parents: 7f5c0df
Author: Koji Kawamura <ijokarumawak@apache.org>
Authored: Thu Mar 2 22:54:46 2017 +0900
Committer: Aldrin Piri <aldrin@apache.org>
Committed: Mon Mar 6 09:29:44 2017 -0500
----------------------------------------------------------------------
.../apache/nifi/processors/standard/Notify.java | 8 +-
.../apache/nifi/processors/standard/Wait.java | 268 ++++++++++++++-----
.../processors/standard/WaitNotifyProtocol.java | 86 +++++-
.../nifi/processors/standard/TestWait.java | 246 ++++++++++++++++-
.../standard/TestWaitNotifyProtocol.java | 106 +++++++-
5 files changed, 617 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/000414e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
index 155d131..346f1fb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
@@ -97,7 +97,10 @@ public class Notify extends AbstractProcessor {
"be evaluated against a FlowFile in order to determine the signal counter delta. " +
"Specify how much the counter should increase. " +
"For example, if multiple signal events are processed at upstream flow in batch oriented way, " +
- "the number of events processed can be notified with this property at once.")
+ "the number of events processed can be notified with this property at once. " +
+ "Zero (0) has a special meaning, it clears target count back to 0, which is especially useful when used with Wait " +
+ Wait.RELEASABLE_FLOWFILE_COUNT.getDisplayName() + " = Zero (0) mode, to provide 'open-close-gate' type of flow control. " +
+ "One (1) can open a corresponding Wait processor, and Zero (0) can negate it as if closing a gate.")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
@@ -171,7 +174,8 @@ public class Notify extends AbstractProcessor {
int incrementDelta(final String counterName, final int delta) {
int current = deltas.containsKey(counterName) ? deltas.get(counterName) : 0;
- int updated = current + delta;
+ // Zero (0) clears count.
+ int updated = delta == 0 ? 0 : current + delta;
deltas.put(counterName, updated);
return updated;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/000414e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index 8b76ce5..fccd443 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -26,6 +26,11 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@@ -40,11 +45,13 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@@ -52,6 +59,10 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
+import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
+import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
+
@EventDriven
@SupportsBatching
@Tags({"map", "cache", "wait", "hold", "distributed", "signal", "release"})
@@ -63,7 +74,7 @@ import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
+ "The release signal entry is then removed from the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. "
+ "If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. "
- + "This is particularly useful with processors that split a source flow file into multiple fragments, such as SplitText. "
+ + "This is particularly useful with processors that split a source FlowFile into multiple fragments, such as SplitText. "
+ "In order to wait for all fragments to be processed, connect the 'original' relationship to a Wait processor, and the 'splits' relationship to "
+ "a corresponding Notify processor. Configure the Notify and Wait processors to use the '${fragment.identifier}' as the value "
+ "of 'Release Signal Identifier', and specify '${fragment.count}' as the value of 'Target Signal Count' in the Wait processor."
@@ -125,11 +136,36 @@ public class Wait extends AbstractProcessor {
.expressionLanguageSupported(true)
.build();
+ public static final PropertyDescriptor WAIT_BUFFER_COUNT = new PropertyDescriptor.Builder()
+ .name("wait-buffer-count")
+ .displayName("Wait Buffer Count")
+ .description("Specify the maximum number of incoming FlowFiles that can be buffered to check whether it can move forward. " +
+ "The more buffer can provide the better performance, as it reduces the number of interactions with cache service " +
+ "by grouping FlowFiles by signal identifier. " +
+ "Only a signal identifier can be processed at a processor execution.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("1")
+ .build();
+
+ public static final PropertyDescriptor RELEASABLE_FLOWFILE_COUNT = new PropertyDescriptor.Builder()
+ .name("releasable-flowfile-count")
+ .displayName("Releasable FlowFile Count")
+ .description("A value, or the results of an Attribute Expression Language statement, which will " +
+ "be evaluated against a FlowFile in order to determine the releasable FlowFile count. " +
+ "This specifies how many FlowFiles can be released when a target count reaches target signal count. " +
+ "Zero (0) has a special meaning, any number of FlowFiles can be released as long as signal count matches target.")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue("1")
+ .build();
+
// Selects the FlowFile attribute or expression, whose value is used as cache key
public static final PropertyDescriptor EXPIRATION_DURATION = new PropertyDescriptor.Builder()
.name("expiration-duration")
.displayName("Expiration Duration")
- .description("Indicates the duration after which waiting flow files will be routed to the 'expired' relationship")
+ .description("Indicates the duration after which waiting FlowFiles will be routed to the 'expired' relationship")
.required(true)
.defaultValue("10 min")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
@@ -145,7 +181,7 @@ public class Wait extends AbstractProcessor {
public static final PropertyDescriptor ATTRIBUTE_COPY_MODE = new PropertyDescriptor.Builder()
.name("attribute-copy-mode")
.displayName("Attribute Copy Mode")
- .description("Specifies how to handle attributes copied from flow files entering the Notify processor")
+ .description("Specifies how to handle attributes copied from FlowFiles entering the Notify processor")
.defaultValue(ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue())
.required(true)
.allowableValues(ATTRIBUTE_COPY_REPLACE, ATTRIBUTE_COPY_KEEP_ORIGINAL)
@@ -208,6 +244,8 @@ public class Wait extends AbstractProcessor {
descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
descriptors.add(TARGET_SIGNAL_COUNT);
descriptors.add(SIGNAL_COUNTER_NAME);
+ descriptors.add(WAIT_BUFFER_COUNT);
+ descriptors.add(RELEASABLE_FLOWFILE_COUNT);
descriptors.add(EXPIRATION_DURATION);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(ATTRIBUTE_COPY_MODE);
@@ -223,21 +261,81 @@ public class Wait extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
final ComponentLog logger = getLogger();
// Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
- final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
+ final PropertyValue signalIdProperty = context.getProperty(RELEASE_SIGNAL_IDENTIFIER);
+ final Integer bufferCount = context.getProperty(WAIT_BUFFER_COUNT).asInteger();
+
+ final Map<Relationship, List<FlowFile>> processedFlowFiles = new HashMap<>();
+ final Function<Relationship, List<FlowFile>> getFlowFilesFor = r -> processedFlowFiles.computeIfAbsent(r, k -> new ArrayList<>());
+
+ final AtomicReference<String> targetSignalId = new AtomicReference<>();
+ final AtomicInteger bufferedCount = new AtomicInteger(0);
+ final List<FlowFile> failedFilteringFlowFiles = new ArrayList<>();
+ final Supplier<FlowFileFilter.FlowFileFilterResult> acceptResultSupplier =
+ () -> bufferedCount.incrementAndGet() == bufferCount ? ACCEPT_AND_TERMINATE : ACCEPT_AND_CONTINUE;
+ final List<FlowFile> flowFiles = session.get(f -> {
+
+ final String fSignalId = signalIdProperty.evaluateAttributeExpressions(f).getValue();
+
+ // if the computed value is null, or empty, we transfer the FlowFile to failure relationship
+ if (StringUtils.isBlank(fSignalId)) {
+ // We can't penalize f before getting it from session, so keep it in a temporal list.
+ logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {f});
+ failedFilteringFlowFiles.add(f);
+ return ACCEPT_AND_CONTINUE;
+ }
+
+ final String targetSignalIdStr = targetSignalId.get();
+ if (targetSignalIdStr == null) {
+ // This is the first one.
+ targetSignalId.set(fSignalId);
+ return acceptResultSupplier.get();
+ }
+
+ if (targetSignalIdStr.equals(fSignalId)) {
+ return acceptResultSupplier.get();
+ }
+
+ return REJECT_AND_CONTINUE;
- // if the computed value is null, or empty, we transfer the flow file to failure relationship
- if (StringUtils.isBlank(signalId)) {
- logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
+ });
+
+ final String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
+ final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
+ final AtomicReference<Signal> signalRef = new AtomicReference<>();
+
+ final Consumer<FlowFile> transferToFailure = flowFile -> {
flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
+ getFlowFilesFor.apply(REL_FAILURE).add(flowFile);
+ };
+
+ final Consumer<Entry<Relationship, List<FlowFile>>> transferFlowFiles = routedFlowFiles -> {
+ Relationship relationship = routedFlowFiles.getKey();
+
+ if (REL_WAIT.equals(relationship)) {
+ final String waitMode = context.getProperty(WAIT_MODE).getValue();
+
+ if (WAIT_MODE_KEEP_IN_UPSTREAM.getValue().equals(waitMode)) {
+ // Transfer to self.
+ relationship = Relationship.SELF;
+ }
+ }
+
+ final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream()
+ .map(f -> copySignalAttributes(session, f, signalRef.get(), replaceOriginalAttributes)).collect(Collectors.toList());
+ session.transfer(flowFilesWithSignalAttributes, relationship);
+ };
+
+ failedFilteringFlowFiles.forEach(f -> {
+ flowFiles.remove(f);
+ transferToFailure.accept(f);
+ });
+
+ if (flowFiles.isEmpty()) {
+ // If there was nothing but failed FlowFiles while filtering, transfer those and end immediately.
+ processedFlowFiles.entrySet().forEach(transferFlowFiles);
return;
}
@@ -245,95 +343,131 @@ public class Wait extends AbstractProcessor {
final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
- String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
- final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
+ final String signalId = targetSignalId.get();
+ final Signal signal;
- Signal signal = null;
+ // get notifying signal
try {
- // get notifying signal
signal = protocol.getSignal(signalId);
+ signalRef.set(signal);
+ } catch (final IOException e) {
+ throw new ProcessException(String.format("Failed to get signal for %s due to %s", signalId, e), e);
+ }
- // check for expiration
+ String targetCounterName = null;
+ long targetCount = 1;
+ int releasableFlowFileCount = 1;
+
+ final List<FlowFile> candidates = new ArrayList<>();
+
+ for (FlowFile flowFile : flowFiles) {
+ // Set wait start timestamp if it's not set yet
String waitStartTimestamp = flowFile.getAttribute(WAIT_START_TIMESTAMP);
if (waitStartTimestamp == null) {
waitStartTimestamp = String.valueOf(System.currentTimeMillis());
flowFile = session.putAttribute(flowFile, WAIT_START_TIMESTAMP, waitStartTimestamp);
}
- long lWaitStartTimestamp = 0L;
+ long lWaitStartTimestamp;
try {
lWaitStartTimestamp = Long.parseLong(waitStartTimestamp);
} catch (NumberFormatException nfe) {
logger.error("{} has an invalid value '{}' on FlowFile {}", new Object[] {WAIT_START_TIMESTAMP, waitStartTimestamp, flowFile});
- flowFile = session.penalize(flowFile);
-
- flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
- session.transfer(flowFile, REL_FAILURE);
- return;
+ transferToFailure.accept(flowFile);
+ continue;
}
+
+ // check for expiration
long expirationDuration = context.getProperty(EXPIRATION_DURATION)
.asTimePeriod(TimeUnit.MILLISECONDS);
long now = System.currentTimeMillis();
if (now > (lWaitStartTimestamp + expirationDuration)) {
logger.info("FlowFile {} expired after {}ms", new Object[] {flowFile, (now - lWaitStartTimestamp)});
- flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
- session.transfer(flowFile, REL_EXPIRED);
- return;
+ getFlowFilesFor.apply(REL_EXPIRED).add(flowFile);
+ continue;
}
+ // If there's no signal yet, then we don't have to evaluate target counts. Return immediately.
if (signal == null) {
- // If there's no signal yet, then we don't have to evaluate target counts. Return immediately.
if (logger.isDebugEnabled()) {
- logger.debug("No release signal yet for {} on FlowFile {}", new Object[] {signalId, flowFile});
+ logger.debug("No release signal found for {} on FlowFile {} yet", new Object[] {signalId, flowFile});
}
+ getFlowFilesFor.apply(REL_WAIT).add(flowFile);
+ continue;
+ }
-
- final String waitMode = context.getProperty(WAIT_MODE).getValue();
- if (WAIT_MODE_TRANSFER_TO_WAIT.getValue().equals(waitMode)) {
- session.transfer(flowFile, REL_WAIT);
- } else if (WAIT_MODE_KEEP_IN_UPSTREAM.getValue().equals(waitMode)) {
- // Transfer to self.
- session.transfer(flowFile);
- } else {
- throw new ProcessException("Unsupported wait mode " + waitMode + " was specified.");
+ // Fix target counter name and count from current FlowFile, if those are not set yet.
+ if (candidates.isEmpty()) {
+ targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ try {
+ targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue());
+ } catch (final NumberFormatException e) {
+ transferToFailure.accept(flowFile);
+ logger.error("Failed to parse targetCount when processing {} due to {}", new Object[] {flowFile, e}, e);
+ continue;
+ }
+ try {
+ releasableFlowFileCount = Integer.valueOf(context.getProperty(RELEASABLE_FLOWFILE_COUNT).evaluateAttributeExpressions(flowFile).getValue());
+ } catch (final NumberFormatException e) {
+ transferToFailure.accept(flowFile);
+ logger.error("Failed to parse releasableFlowFileCount when processing {} due to {}", new Object[] {flowFile, e}, e);
+ continue;
}
- return;
}
- final String targetCounterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
- final Long targetCount = Long.valueOf(context.getProperty(TARGET_SIGNAL_COUNT).evaluateAttributeExpressions(flowFile).getValue());
- final boolean reachedToTargetCount = StringUtils.isBlank(targetCounterName)
- ? signal.isTotalCountReached(targetCount)
- : signal.isCountReached(targetCounterName, targetCount);
+ // FlowFile is now validated and added to candidates.
+ candidates.add(flowFile);
+ }
- if (!reachedToTargetCount) {
- if (logger.isDebugEnabled()) {
- logger.debug("Release signal count {} hasn't reached {} for {} on FlowFile {}",
- new Object[] {targetCounterName, targetCount, signalId, flowFile});
+ boolean waitCompleted = false;
+ boolean waitProgressed = false;
+ if (signal != null && !candidates.isEmpty()) {
+
+ if (releasableFlowFileCount > 1) {
+ signal.releaseCandidatese(targetCounterName, targetCount, releasableFlowFileCount, candidates,
+ released -> getFlowFilesFor.apply(REL_SUCCESS).addAll(released),
+ waiting -> getFlowFilesFor.apply(REL_WAIT).addAll(waiting));
+ waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty();
+
+ } else {
+ // releasableFlowFileCount = 0 or 1
+ boolean reachedTargetCount = StringUtils.isBlank(targetCounterName)
+ ? signal.isTotalCountReached(targetCount)
+ : signal.isCountReached(targetCounterName, targetCount);
+
+ if (reachedTargetCount) {
+ if (releasableFlowFileCount == 0) {
+ getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates);
+ } else {
+ // releasableFlowFileCount = 1
+ getFlowFilesFor.apply(REL_SUCCESS).add(candidates.remove(0));
+ getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
+ // If releasableFlowFileCount == 0, leave signal as it is,
+ // so that any number of FlowFile can be released as long as target count condition matches.
+ waitCompleted = true;
+ }
+ } else {
+ getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
}
- flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
- session.transfer(flowFile, REL_WAIT);
- return;
}
+ }
+ // Transfer FlowFiles.
+ processedFlowFiles.entrySet().forEach(transferFlowFiles);
- flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
- session.transfer(flowFile, REL_SUCCESS);
-
- protocol.complete(signalId);
-
- } catch (final NumberFormatException e) {
- flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- logger.error("Failed to parse targetCount when processing {} due to {}", new Object[] {flowFile, e});
+ // Update signal if needed.
+ try {
+ if (waitCompleted) {
+ protocol.complete(signalId);
+ } else if (waitProgressed) {
+ protocol.replace(signal);
+ }
} catch (final IOException e) {
- flowFile = copySignalAttributes(session, flowFile, signal, replaceOriginalAttributes);
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e});
+ session.rollback();
+ throw new ProcessException(String.format("Unable to communicate with cache while updating %s due to %s", signalId, e), e);
}
+
}
private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final boolean replaceOriginal) {
@@ -341,13 +475,13 @@ public class Wait extends AbstractProcessor {
return flowFile;
}
- // copy over attributes from release signal flow file, if provided
+ // copy over attributes from release signal FlowFile, if provided
final Map<String, String> attributesToCopy;
if (replaceOriginal) {
attributesToCopy = new HashMap<>(signal.getAttributes());
attributesToCopy.remove("uuid");
} else {
- // if the current flow file does *not* have the cached attribute, copy it
+ // if the current FlowFile does *not* have the cached attribute, copy it
attributesToCopy = signal.getAttributes().entrySet().stream()
.filter(e -> flowFile.getAttribute(e.getKey()) == null)
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
http://git-wip-us.apache.org/repos/asf/nifi/blob/000414e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
index ae5cbbd..1c89108 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
@@ -31,7 +31,9 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
/**
* This class provide a protocol for Wait and Notify processors to work together.
@@ -51,8 +53,16 @@ public class WaitNotifyProtocol {
private final Deserializer<String> stringDeserializer = input -> new String(input, StandardCharsets.UTF_8);
public static class Signal {
+
+ /*
+ * Getter and Setter methods are needed to (de)serialize JSON even if it's not used from app code.
+ */
+
+ transient private String identifier;
+ transient private long revision = -1;
private Map<String, Long> counts = new HashMap<>();
private Map<String, String> attributes = new HashMap<>();
+ private int releasableCount = 0;
public Map<String, Long> getCounts() {
return counts;
@@ -84,6 +94,54 @@ public class WaitNotifyProtocol {
return count != null ? count : 0;
}
+ public int getReleasableCount() {
+ return releasableCount;
+ }
+
+ public void setReleasableCount(int releasableCount) {
+ this.releasableCount = releasableCount;
+ }
+
+ /**
+ * <p>Consume accumulated notification signals to let some waiting candidates get released.</p>
+ *
+ * <p>This method updates state of this instance, but does not update cache storage.
+ * Caller of this method is responsible for updating cache storage after processing released and waiting candidates
+ * by calling {@link #replace(Signal)}. Caller should rollback what it processed with these candidates if complete call failed.</p>
+ *
+ * @param _counterName signal counter name to consume from.
+ * @param requiredCountForPass number of required signals to acquire a pass.
+ * @param releasableCandidateCountPerPass number of releasable candidate per pass.
+ * @param candidates candidates waiting for being allowed to pass.
+ * @param released function to process allowed candidates to pass.
+ * @param waiting function to process candidates those should remain in waiting queue.
+ * @param <E> Type of candidate
+ */
+ public <E> void releaseCandidatese(final String _counterName, final long requiredCountForPass,
+ final int releasableCandidateCountPerPass, final List<E> candidates,
+ final Consumer<List<E>> released, final Consumer<List<E>> waiting) {
+
+ // counterName is mandatory otherwise, we can't decide which counter to convert into pass count.
+ final String counterName = _counterName == null || _counterName.length() == 0 ? DEFAULT_COUNT_NAME : _counterName;
+
+ final int candidateSize = candidates.size();
+ if (releasableCount < candidateSize) {
+ // If current passCount is not enough for the candidate size, then try to get more.
+ // Convert notification signals to pass ticket.
+ final long signalCount = getCount(counterName);
+ releasableCount += (signalCount / requiredCountForPass) * releasableCandidateCountPerPass;
+ final long reducedSignalCount = signalCount % requiredCountForPass;
+ counts.put(counterName, reducedSignalCount);
+ }
+
+ int releaseCount = Math.min(releasableCount, candidateSize);
+
+ released.accept(candidates.subList(0, releaseCount));
+ waiting.accept(candidates.subList(releaseCount, candidateSize));
+
+ releasableCount -= releaseCount;
+ }
+
}
private final AtomicDistributedMapCacheClient cache;
@@ -95,7 +153,7 @@ public class WaitNotifyProtocol {
/**
* Notify a signal to increase a counter.
* @param signalId a key in the underlying cache engine
- * @param deltas a map containing counterName and delta entries
+ * @param deltas a map containing counterName and delta entries, 0 has special meaning, clears the counter back to 0
* @param attributes attributes to save in the cache entry
* @return A Signal instance, merged with an existing signal if any
* @throws IOException thrown when it failed interacting with the cache engine
@@ -106,10 +164,9 @@ public class WaitNotifyProtocol {
for (int i = 0; i < MAX_REPLACE_RETRY_COUNT; i++) {
- final CacheEntry<String, String> existingEntry = cache.fetch(signalId, stringSerializer, stringDeserializer);
-
final Signal existingSignal = getSignal(signalId);
final Signal signal = existingSignal != null ? existingSignal : new Signal();
+ signal.identifier = signalId;
if (attributes != null) {
signal.attributes.putAll(attributes);
@@ -117,15 +174,11 @@ public class WaitNotifyProtocol {
deltas.forEach((counterName, delta) -> {
long count = signal.counts.containsKey(counterName) ? signal.counts.get(counterName) : 0;
- count += delta;
+ count = delta == 0 ? 0 : count + delta;
signal.counts.put(counterName, count);
});
- final String signalJson = objectMapper.writeValueAsString(signal);
- final long revision = existingEntry != null ? existingEntry.getRevision() : -1;
-
-
- if (cache.replace(signalId, signalJson, stringSerializer, stringSerializer, revision)) {
+ if (replace(signal)) {
return signal;
}
@@ -148,7 +201,7 @@ public class WaitNotifyProtocol {
* Notify a signal to increase a counter.
* @param signalId a key in the underlying cache engine
* @param counterName specify count to update
- * @param delta delta to update a counter
+ * @param delta delta to update a counter, 0 has special meaning, clears the counter back to 0
* @param attributes attributes to save in the cache entry
* @return A Signal instance, merged with an existing signal if any
* @throws IOException thrown when it failed interacting with the cache engine
@@ -184,12 +237,16 @@ public class WaitNotifyProtocol {
final String value = entry.getValue();
try {
- return objectMapper.readValue(value, Signal.class);
+ final Signal signal = objectMapper.readValue(value, Signal.class);
+ signal.identifier = signalId;
+ signal.revision = entry.getRevision();
+ return signal;
} catch (final JsonParseException jsonE) {
// Try to read it as FlowFileAttributes for backward compatibility.
try {
final Map<String, String> attributes = new FlowFileAttributesSerializer().deserialize(value.getBytes(StandardCharsets.UTF_8));
final Signal signal = new Signal();
+ signal.identifier = signalId;
signal.setAttributes(attributes);
signal.getCounts().put(DEFAULT_COUNT_NAME, 1L);
return signal;
@@ -209,4 +266,11 @@ public class WaitNotifyProtocol {
public void complete(final String signalId) throws IOException {
cache.remove(signalId, stringSerializer);
}
+
+ public boolean replace(final Signal signal) throws IOException {
+
+ final String signalJson = objectMapper.writeValueAsString(signal);
+ return cache.replace(signal.identifier, signalJson, stringSerializer, stringSerializer, signal.revision);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/000414e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
index 0ce0045..eb196c9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
@@ -19,12 +19,22 @@ package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.TestNotify.MockCacheClient;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
@@ -180,14 +190,15 @@ public class TestWait {
final Map<String, String> props = new HashMap<>();
props.put("releaseSignalAttribute", "2");
runner.enqueue(new byte[]{}, props);
- runner.run();
-
- //Expect the processor to receive an IO exception from the cache service and route to failure
- runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
- runner.assertTransferCount(Wait.REL_FAILURE, 1);
- runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total");
-
- service.setFailOnCalls(false);
+ try {
+ runner.run();
+ fail("Expect the processor to receive an IO exception from the cache service and throws ProcessException.");
+ } catch (final AssertionError e) {
+ assertTrue(e.getCause() instanceof ProcessException);
+ assertTrue(e.getCause().getCause() instanceof IOException);
+ } finally {
+ service.setFailOnCalls(false);
+ }
}
@Test
@@ -360,7 +371,6 @@ public class TestWait {
assertNull("The key no longer exist", protocol.getSignal("key"));
}
-
@Test
public void testWaitForSpecificCount() throws InitializationException, IOException {
Map<String, String> cachedAttributes = new HashMap<>();
@@ -455,4 +465,222 @@ public class TestWait {
}
+ private class TestIteration {
+ final List<MockFlowFile> released = new ArrayList<>();
+ final List<MockFlowFile> waiting = new ArrayList<>();
+ final List<MockFlowFile> failed = new ArrayList<>();
+
+ final List<String> expectedReleased = new ArrayList<>();
+ final List<String> expectedWaiting = new ArrayList<>();
+ final List<String> expectedFailed = new ArrayList<>();
+
+ void run() {
+ released.clear();
+ waiting.clear();
+ failed.clear();
+
+ runner.run();
+
+ released.addAll(runner.getFlowFilesForRelationship(Wait.REL_SUCCESS));
+ waiting.addAll(runner.getFlowFilesForRelationship(Wait.REL_WAIT));
+ failed.addAll(runner.getFlowFilesForRelationship(Wait.REL_FAILURE));
+
+ assertEquals(expectedReleased.size(), released.size());
+ assertEquals(expectedWaiting.size(), waiting.size());
+ assertEquals(expectedFailed.size(), failed.size());
+
+ final BiConsumer<List<String>, List<MockFlowFile>> assertContents = (expected, actual) -> {
+ for (int i = 0; i < expected.size(); i++) {
+ actual.get(i).assertContentEquals(expected.get(i));
+ }
+ };
+
+ assertContents.accept(expectedReleased, released);
+ assertContents.accept(expectedWaiting, waiting);
+ assertContents.accept(expectedFailed, failed);
+
+ runner.clearTransferState();
+ expectedReleased.clear();
+ expectedWaiting.clear();
+ expectedFailed.clear();
+ }
+ }
+
+ @Test
+ public void testWaitBufferCount() throws InitializationException, IOException {
+ Map<String, String> cachedAttributes = new HashMap<>();
+ cachedAttributes.put("notified", "notified-value");
+
+ // Setup existing cache entry.
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
+
+ runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
+ runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
+ runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
+ runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2");
+
+ final Map<String, String> waitAttributesA = new HashMap<>();
+ waitAttributesA.put("releaseSignalAttribute", "key-A");
+ waitAttributesA.put("targetSignalCount", "1");
+ waitAttributesA.put("signalCounterName", "counter");
+
+ final Map<String, String> waitAttributesB = new HashMap<>();
+ waitAttributesB.put("releaseSignalAttribute", "key-B");
+ waitAttributesB.put("targetSignalCount", "3");
+ waitAttributesB.put("signalCounterName", "counter");
+
+ final Map<String, String> waitAttributesBInvalid = new HashMap<>();
+ waitAttributesBInvalid.putAll(waitAttributesB);
+ waitAttributesBInvalid.remove("releaseSignalAttribute");
+
+ final TestIteration testIteration = new TestIteration();
+
+ // Enqueue multiple wait FlowFiles.
+ runner.enqueue("1".getBytes(), waitAttributesB); // Should be picked at 1st and 2nd itr
+ runner.enqueue("2".getBytes(), waitAttributesA); // Should be picked at 3rd itr
+ runner.enqueue("3".getBytes(), waitAttributesBInvalid);
+ runner.enqueue("4".getBytes(), waitAttributesA); // Should be picked at 3rd itr
+ runner.enqueue("5".getBytes(), waitAttributesB); // Should be picked at 1st
+ runner.enqueue("6".getBytes(), waitAttributesB); // Should be picked at 2nd itr
+
+ /*
+ * 1st run:
+ * pick 1 key-B
+ * skip 2 cause key-A
+ * skip 3 cause invalid
+ * skip 4 cause key-A
+ * pick 5 key-B
+ */
+ testIteration.expectedWaiting.addAll(Arrays.asList("1", "5")); // Picked, but not enough counter.
+ testIteration.expectedFailed.add("3"); // invalid.
+ testIteration.run();
+
+ /*
+ * 2nd run:
+ * pick 6 key-B
+ * pick 1 key-B
+ */
+ protocol.notify("key-B", "counter", 3, cachedAttributes);
+ testIteration.expectedReleased.add("6");
+ testIteration.expectedWaiting.add("1"); // Picked but only one FlowFile can be released.
+ // enqueue waiting, simulating wait relationship back to self
+ testIteration.waiting.forEach(f -> runner.enqueue(f));
+ testIteration.run();
+
+ }
+
+ @Test
+ public void testReleaseMultipleFlowFiles() throws InitializationException, IOException {
+ Map<String, String> cachedAttributes = new HashMap<>();
+ cachedAttributes.put("notified", "notified-value");
+
+ // Setup existing cache entry.
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
+
+ runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
+ runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
+ runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
+ runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2");
+ runner.setProperty(Wait.RELEASABLE_FLOWFILE_COUNT, "${fragmentCount}");
+
+ final Map<String, String> waitAttributes = new HashMap<>();
+ waitAttributes.put("releaseSignalAttribute", "key");
+ waitAttributes.put("targetSignalCount", "3");
+ waitAttributes.put("signalCounterName", "counter");
+ waitAttributes.put("fragmentCount", "6");
+
+ final TestIteration testIteration = new TestIteration();
+
+ // Enqueue 6 wait FlowFiles. 1,2,3,4,5,6
+ IntStream.range(1, 7).forEach(i -> runner.enqueue(String.valueOf(i).getBytes(), waitAttributes));
+
+ /*
+ * 1st run
+ */
+ testIteration.expectedWaiting.addAll(Arrays.asList("1", "2"));
+ testIteration.run();
+
+ WaitNotifyProtocol.Signal signal = protocol.getSignal("key");
+ assertNull(signal);
+
+ /*
+ * 2nd run
+ */
+ protocol.notify("key", "counter", 3, cachedAttributes);
+ testIteration.expectedReleased.addAll(Arrays.asList("3", "4"));
+ testIteration.waiting.forEach(f -> runner.enqueue(f));
+ testIteration.run();
+
+ signal = protocol.getSignal("key");
+ assertEquals(0, signal.getCount("count"));
+ assertEquals(4, signal.getReleasableCount());
+
+ /*
+ * 3rd run
+ */
+ testIteration.expectedReleased.addAll(Arrays.asList("5", "6"));
+ testIteration.waiting.forEach(f -> runner.enqueue(f));
+ testIteration.run();
+
+ signal = protocol.getSignal("key");
+ assertEquals(0, signal.getCount("count"));
+ assertEquals(2, signal.getReleasableCount());
+ }
+
+ @Test
+ public void testOpenGate() throws InitializationException, IOException {
+ Map<String, String> cachedAttributes = new HashMap<>();
+ cachedAttributes.put("notified", "notified-value");
+
+ // Setup existing cache entry.
+ final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
+
+ runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
+ runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
+ runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
+ runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2");
+ runner.setProperty(Wait.RELEASABLE_FLOWFILE_COUNT, "0"); // Leave gate open
+
+ final Map<String, String> waitAttributes = new HashMap<>();
+ waitAttributes.put("releaseSignalAttribute", "key");
+ waitAttributes.put("targetSignalCount", "3");
+ waitAttributes.put("signalCounterName", "counter");
+
+ final TestIteration testIteration = new TestIteration();
+
+ // Enqueue 6 wait FlowFiles. 1,2,3,4,5,6
+ IntStream.range(1, 7).forEach(i -> runner.enqueue(String.valueOf(i).getBytes(), waitAttributes));
+
+ /*
+ * 1st run
+ */
+ testIteration.expectedWaiting.addAll(Arrays.asList("1", "2"));
+ testIteration.run();
+
+ WaitNotifyProtocol.Signal signal = protocol.getSignal("key");
+ assertNull(signal);
+
+ /*
+ * 2nd run
+ */
+ protocol.notify("key", "counter", 3, cachedAttributes);
+ testIteration.expectedReleased.addAll(Arrays.asList("3", "4"));
+ testIteration.waiting.forEach(f -> runner.enqueue(f));
+ testIteration.run();
+
+ signal = protocol.getSignal("key");
+ assertEquals(3, signal.getCount("counter"));
+ assertEquals(0, signal.getReleasableCount());
+
+ /*
+ * 3rd run
+ */
+ testIteration.expectedReleased.addAll(Arrays.asList("5", "6"));
+ testIteration.waiting.forEach(f -> runner.enqueue(f));
+ testIteration.run();
+
+ signal = protocol.getSignal("key");
+ assertEquals(3, signal.getCount("counter"));
+ assertEquals(0, signal.getReleasableCount());
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/000414e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
index bf7e1e6..13b4346 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
@@ -27,11 +27,18 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.stubbing.Answer;
+import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import static org.apache.nifi.processors.standard.WaitNotifyProtocol.DEFAULT_COUNT_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -102,7 +109,7 @@ public class TestWaitNotifyProtocol {
final CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id");
assertEquals(0, cacheEntry.getRevision());
- assertEquals("{\"counts\":{\"a\":1},\"attributes\":{}}", cacheEntry.getValue());
+ assertEquals("{\"counts\":{\"a\":1},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
}
@Test
@@ -119,20 +126,20 @@ public class TestWaitNotifyProtocol {
CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id");
assertEquals(1, cacheEntry.getRevision());
- assertEquals("{\"counts\":{\"a\":2},\"attributes\":{}}", cacheEntry.getValue());
+ assertEquals("{\"counts\":{\"a\":2},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
protocol.notify(signalId, "a", 10, null);
cacheEntry = cacheEntries.get("signal-id");
assertEquals(2, cacheEntry.getRevision());
- assertEquals("{\"counts\":{\"a\":12},\"attributes\":{}}", cacheEntry.getValue());
+ assertEquals("{\"counts\":{\"a\":12},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
protocol.notify(signalId, "b", 2, null);
protocol.notify(signalId, "c", 3, null);
cacheEntry = cacheEntries.get("signal-id");
assertEquals(4, cacheEntry.getRevision());
- assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{}}", cacheEntry.getValue());
+ assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
final Map<String, Integer> deltas = new HashMap<>();
deltas.put("a", 10);
@@ -141,7 +148,13 @@ public class TestWaitNotifyProtocol {
cacheEntry = cacheEntries.get("signal-id");
assertEquals(5, cacheEntry.getRevision());
- assertEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{}}", cacheEntry.getValue());
+ assertEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
+
+ // Zero clear 'b'.
+ protocol.notify("signal-id", "b", 0, null);
+ cacheEntry = cacheEntries.get("signal-id");
+ assertEquals(6, cacheEntry.getRevision());
+ assertEquals("{\"counts\":{\"a\":22,\"b\":0,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
}
@@ -161,7 +174,7 @@ public class TestWaitNotifyProtocol {
CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id");
assertEquals(0, cacheEntry.getRevision());
- assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"}}", cacheEntry.getValue());
+ assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"},\"releasableCount\":0}", cacheEntry.getValue());
final Map<String, String> attributeA2 = new HashMap<>();
attributeA2.put("p2", "a2"); // Update p2
@@ -173,7 +186,7 @@ public class TestWaitNotifyProtocol {
cacheEntry = cacheEntries.get("signal-id");
assertEquals(1, cacheEntry.getRevision());
assertEquals("Updated attributes should be merged correctly",
- "{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"}}", cacheEntry.getValue());
+ "{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"},\"releasableCount\":0}", cacheEntry.getValue());
}
@@ -237,7 +250,7 @@ public class TestWaitNotifyProtocol {
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
final Signal signal = protocol.getSignal(signalId);
- assertEquals(1, signal.getCount(WaitNotifyProtocol.DEFAULT_COUNT_NAME));
+ assertEquals(1, signal.getCount(DEFAULT_COUNT_NAME));
assertEquals("value1", signal.getAttributes().get("key1"));
assertEquals("value2", signal.getAttributes().get("key2"));
assertEquals("value3", signal.getAttributes().get("key3"));
@@ -251,4 +264,81 @@ public class TestWaitNotifyProtocol {
}
+ @Test
+ public void testReleaseCandidate() throws Exception {
+ final List<Integer> candidates = IntStream.range(0, 10).boxed().collect(Collectors.toList());
+ final Signal signal = new Signal();
+ final List<Integer> released = new ArrayList<>();
+ final List<Integer> waiting = new ArrayList<>();
+
+ // Test default name.
+ final String counterName = null;
+
+ final BiConsumer<Long, Integer> releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> {
+ released.clear();
+ waiting.clear();
+ signal.releaseCandidatese(counterName, requiredCountForPass, releasableCandidatePerPass, candidates,
+ r -> released.addAll(r), w -> waiting.addAll(w));
+ };
+
+ final Field releasableCount = Signal.class.getDeclaredField("releasableCount");
+ releasableCount.setAccessible(true);
+
+ // No counter, should wait.
+ releaseCandidate.accept(3L, 1);
+ assertEquals(0, released.size());
+ assertEquals(10, waiting.size());
+ assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME));
+ assertEquals(0, releasableCount.getInt(signal));
+
+ // Counter is not enough yet.
+ signal.getCounts().put(DEFAULT_COUNT_NAME, 1L);
+ releaseCandidate.accept(3L, 1);
+ assertEquals(0, released.size());
+ assertEquals(10, waiting.size());
+ assertEquals(1, signal.getCount(DEFAULT_COUNT_NAME)); // Counter incremented, but not enough
+ assertEquals(0, releasableCount.getInt(signal));
+
+ // Counter reached the target.
+ signal.getCounts().put(DEFAULT_COUNT_NAME, 3L);
+ releaseCandidate.accept(3L, 1);
+ assertEquals(1, released.size());
+ assertEquals(9, waiting.size());
+ assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // Counter 3 was converted into 1 release
+ assertEquals(0, releasableCount.getInt(signal));
+
+ // Counter reached the target for two candidates.
+ signal.getCounts().put(DEFAULT_COUNT_NAME, 6L);
+ releaseCandidate.accept(3L, 1);
+ assertEquals(2, released.size());
+ assertEquals(8, waiting.size());
+ assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // Counter 3 was converted into 1 release
+ assertEquals(0, releasableCount.getInt(signal));
+
+ // Counter reached the target for two candidates, and reminder is 2.
+ signal.getCounts().put(DEFAULT_COUNT_NAME, 11L);
+ releaseCandidate.accept(3L, 1);
+ assertEquals(3, released.size()); // 11 / 3 = 3
+ assertEquals(7, waiting.size());
+ assertEquals(2, signal.getCount(DEFAULT_COUNT_NAME)); // 11 % 3 = 2
+ assertEquals(0, releasableCount.getInt(signal));
+
+ // Counter reached the target for two pass count and each pass can release 2 candidates.
+ signal.getCounts().put(DEFAULT_COUNT_NAME, 6L);
+ releaseCandidate.accept(3L, 2);
+ assertEquals(4, released.size()); // (6 / 3) * 2 = 4
+ assertEquals(6, waiting.size());
+ assertEquals(0, signal.getCount(DEFAULT_COUNT_NAME)); // 6 % 3 = 0
+ assertEquals(0, releasableCount.getInt(signal));
+
+ // If there are counts more than enough to release current candidates, unused releasableCount should remain.
+ signal.getCounts().put(DEFAULT_COUNT_NAME, 50L);
+ releaseCandidate.accept(3L, 2);
+ assertEquals(10, released.size()); // (50 / 3) * 2 = 32. Used 10.
+ assertEquals(0, waiting.size());
+ assertEquals(2, signal.getCount(DEFAULT_COUNT_NAME)); // 50 % 3 = 2.
+ assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22.
+
+ }
+
}
\ No newline at end of file
|