edgent-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (QUARKS-166) Add Gate plumbing
Date Mon, 09 May 2016 23:28:12 GMT

    [ https://issues.apache.org/jira/browse/QUARKS-166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15277310#comment-15277310
] 

ASF GitHub Bot commented on QUARKS-166:
---------------------------------------

Github user Cazen closed the pull request at:

    https://github.com/apache/incubator-quarks/pull/109


> Add Gate plumbing
> -----------------
>
>                 Key: QUARKS-166
>                 URL: https://issues.apache.org/jira/browse/QUARKS-166
>             Project: Quarks
>          Issue Type: New Feature
>            Reporter: Dale LaBossiere
>            Assignee: Cazen Lee
>
> As part of an initial experimental implementation of QUARKS-156 concurrent analytics
/ barrier, I had a need for a stream Gate mechanism - a way to control the release of tuples
into an output stream.  It's not used now.
> If there's a +1 sentiment for adding this to PlumbingStreams here's the code:
>     /**
>      * Control the flow of tuples to an output stream.
>      * <P>
>      * A {@link Semaphore} is used to control the flow of tuples
>      * through the {@code gate}.  The gate acquires a permit from the
>      * semaphore to pass the tuple through, blocking until a permit is
>      * acquired (and applying backpressure upstream while blocked).
>      * Elsewhere, some code calls {@link Semaphore#release(int)}
>      * to make permits available.
>      * </P><P>
>      * If a TopologyProvider is used that can distribute a topology's
>      * streams to different JVM's the gate and the code releasing the
>      * permits must be in the same JVM.
>      * </P><P>
>      * Sample use:
>      * <BR>
>      * Suppose you wanted to control processing such that concurrent
>      * pipelines processed each tuple in lock-step. 
>      * I.e., You want all of the pipelines to start processing a tuple
>      * at the same time and not start a new tuple until the current
>      * tuple had been fully processed by each of them:
>      * <pre>{@code
>      * TStream<Integer> readings = ...;
>      * 
>      * Semaphore gateControl = new Semaphore(1); // allow the first to pass through
>      * TStream<Integer> gated = gate(readings, gateControl);
>      * 
>      * // Create the concurrent pipeline combiner and have it
>      * // signal that concurrent processing of the tuple has completed.
>      * // In this sample the combiner just returns the received list of
>      * // each pipeline result.
>      * Function<TStream<List<Integer>>,TStream<List<Integer>>>
combiner =
>      *     stream -> stream.map(
>      *                list -> { 
>      *                          gateControl.release();
>      *                          return list;
>      *                        });
>      * TStream<List<Integer>> results = PlumbingStreams.concurrent(gated,
pipelines, combiner);
>      * }</pre>
>      * </P>
>      * @param stream the input stream
>      * @param semaphore gate control
>      * @return gated stream
>      */
>     public static <T> TStream<T> gate(TStream<T> stream, Semaphore
semaphore) {
>       return stream.map(tuple -> { 
>           try {
>             semaphore.acquire();
>             return tuple;
>           } catch (InterruptedException e) {
>             Thread.currentThread().interrupt();
>             throw new RuntimeException("interrupted", e);
>           }});
>     }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message