edgent-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dale LaBossiere (JIRA)" <j...@apache.org>
Subject [jira] [Created] (QUARKS-166) Add Gate plumbing
Date Mon, 02 May 2016 19:06:12 GMT
Dale LaBossiere created QUARKS-166:
--------------------------------------

             Summary: Add Gate plumbing
                 Key: QUARKS-166
                 URL: https://issues.apache.org/jira/browse/QUARKS-166
             Project: Quarks
          Issue Type: New Feature
            Reporter: Dale LaBossiere


As part of an initial experimental implementation of QUARKS-156 concurrent analytics / barrier,
I had a need for a stream Gate mechanism - a way to control the release of tuples into an
output stream.  It's not used now.

If there's a +1 sentiment for adding this to PlumbingStreams here's the code:

    /**
     * Control the flow of tuples to an output stream.
     * <P>
     * A {@link Semaphore} is used to control the flow of tuples
     * through the {@code gate}.  The gate acquires a permit from the
     * semaphore to pass the tuple through, blocking until a permit is
     * acquired (and applying backpressure upstream while blocked).
     * Elsewhere, some code calls {@link Semaphore#release(int)}
     * to make permits available.
     * </P><P>
     * If a TopologyProvider is used that can distribute a topology's
     * streams to different JVM's the gate and the code releasing the
     * permits must be in the same JVM.
     * </P><P>
     * Sample use:
     * <BR>
     * Suppose you wanted to control processing such that concurrent
     * pipelines processed each tuple in lock-step. 
     * I.e., You want all of the pipelines to start processing a tuple
     * at the same time and not start a new tuple until the current
     * tuple had been fully processed by each of them:
     * <pre>{@code
     * TStream<Integer> readings = ...;
     * 
     * Semaphore gateControl = new Semaphore(1); // allow the first to pass through
     * TStream<Integer> gated = gate(readings, gateControl);
     * 
     * // Create the concurrent pipeline combiner and have it
     * // signal that concurrent processing of the tuple has completed.
     * // In this sample the combiner just returns the received list of
     * // each pipeline result.
     * Function<TStream<List<Integer>>,TStream<List<Integer>>>
combiner =
     *     stream -> stream.map(
     *                list -> { 
     *                          gateControl.release();
     *                          return list;
     *                        });
     * TStream<List<Integer>> results = PlumbingStreams.concurrent(gated, pipelines,
combiner);
     * }</pre>
     * </P>
     * @param stream the input stream
     * @param semaphore gate control
     * @return gated stream
     */
    public static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore)
{
      return stream.map(tuple -> { 
          try {
            semaphore.acquire();
            return tuple;
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("interrupted", e);
          }});
    }




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message