apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject apex-core git commit: APEXCORE-660 Added documentation for custom control tuple support
Date Thu, 13 Apr 2017 15:53:40 GMT
Repository: apex-core
Updated Branches:
  refs/heads/master 01eb7926d -> ca1a375f9

APEXCORE-660 Added documentation for custom control tuple support

Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/ca1a375f
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/ca1a375f
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/ca1a375f

Branch: refs/heads/master
Commit: ca1a375f983be4876e85719c85c8e06dab129593
Parents: 01eb792
Author: bhupeshchawda <bhupesh@apache.org>
Authored: Mon Mar 6 18:05:07 2017 +0530
Committer: bhupeshchawda <bhupesh@apache.org>
Committed: Thu Apr 13 20:38:55 2017 +0530

 docs/control_tuples.md       | 207 ++++++++++++++++++++++++++++++++++++++
 docs/operator_development.md |  18 +++-
 mkdocs.yml                   |   1 +
 3 files changed, 221 insertions(+), 5 deletions(-)

diff --git a/docs/control_tuples.md b/docs/control_tuples.md
new file mode 100644
index 0000000..f1ac518
--- /dev/null
+++ b/docs/control_tuples.md
@@ -0,0 +1,207 @@
+# User Defined Control Tuples
+## Introduction
+Custom control tuple support in Apache Apex gives the user the capability to insert user
defined control tuples in the data flow. For analogy, the engine already supports a few pre-defined
control tuples like BEGIN_WINDOW, END_WINDOW, etc. Until now, we did not have the support
for applications to insert their own control tuples.
+## Terminology
+All discussion in this document is related to Control Tuples generated by user defined logic.
The document may refer to these tuples as *Control Tuples*, *User Defined Control Tuples*
or *Custom Control Tuples* interchangeably.
+### Definition
+A user defined control tuple could be any user defined object which implements a ControlTuple
+See [Delivery Semantics](#delivery-semantics) for details on DeliveryType
+public interface ControlTuple
+  DeliveryType getDeliveryType();
+  enum DeliveryType
+  {
+  }
+Example user defined control tuple:
+public class TestControlTuple implements ControlTuple
+  public long data;
+  public boolean immediate;
+  // For Kryo
+  public TestControlTuple()
+  {
+    data = 0;
+  }
+  // Constructor
+  public TestControlTuple(long data, boolean immediate)
+  {
+    this.data = data;
+    this.immediate = immediate;
+  }
+  @Override
+  public DeliveryType getDeliveryType()
+  {
+    if (immediate) {
+      return DeliveryType.IMMEDIATE;
+    } else {
+      return DeliveryType.END_WINDOW;
+    }
+  }
+## Use cases
+A control tuple may be used in an application to trigger some sort of action in a downstream
operator. For example, the source operator might want to notify the last operator that it
has emitted all the data in a file and that the file has now ended. Let's call this an *End-Of-File*
control tuple. Once the last operator gets the *End-Of-File* tuple, it would, say, close the
destination file it was writing and create a new file.
+More use cases which were discussed during the requirements of this feature are as follows:
+1. **Batch support** - We need to tell all operators of the physical DAG when a
+batch starts and ends, so the operators can do whatever is needed upon
+the start or the end of a batch.
+2. **Watermark** - To support the concepts of event time windowing, the
+watermark control tuple is needed to identify late windows.
+3. **Changing operator properties** - We do have the support of changing
+operator properties on the fly, but with a custom control tuple, the
+command to change operator properties can be window aligned for all
+partitions and also across the DAG. In other words, the properties of *all* physical partitions
can be aligned to a particular window. In case the behavior of the application needs to change,
we may also be able to change properties of multiple logical operators aligned to a particular
+4. **Recording tuples** - Like changing operator properties, we do have this
+support now but only at the individual physical operator level, and without
+control of which window to record tuples for. With a custom control tuple,
+because a control tuple must belong to a window, all operators in the DAG
+can start (and stop) recording for the same windows.
+## Usage
+###  Generating a Control Tuple
+There is no restriction on which operator in the DAG can or can not generate a control tuple.
The operator which needs to generate a control tuple should declare a port whose type is `ControlAwareDefaultOutputPort`;
the user could simply call the `emitControl(ControlTuple t)` method on this port.
+Example: In the code snippet below, the `Generator` operator declares a `ControlAwareDefaultOutputPort`
called `output` which can emit a data tuple as well as a control tuple.
+public class Generator extends BaseOperator implements InputOperator
+  private long data;
+  private long count;
+  public final transient ControlAwareDefaultOutputPort<Double> output =
+      new ControlAwareDefaultOutputPort<>();
+  @Override
+  public void emitTuples()
+  {
+    // Can emit a data tuple using output.emit()
+    output.emit(data++);
+    count++;
+  }
+  @Override
+  public void endWindow()
+  {
+    // Can also emit a control tuple using output.emitControl()
+    output.emitControl(new TestControlTuple(count, immediate));
+  }
+**Note** - User defined control tuples and control aware ports can only be used in operators
which use the apex-core dependency which has control tuple support, viz. 3.6.0 or above. Previous
versions of apex-core would not be able to support an application which uses user defined
control tuples or control aware ports and would crash at launch time.
+### Receiving a Control Tuple
+Any downstream operator which wants to receive a user defined control tuple, should declare
an input port which is *Control Aware*. A `ControlAwareDefaultInputPort` would have the necessary
capability to process a control tuple in addition to a regular data tuple.
+Example: Below code snippet illustrates the use of `processControl` method of `ControlAwareDefaultInputPort`
to receive / handle user defined control tuples.
+public final transient ControlAwareDefaultInputPort<Double> input =
+    new ControlAwareDefaultInputPort<Double>()
+  // Process a data tuple
+  @Override
+  public void process(Double tuple)
+  {
+    output.emit(tuple);
+  }
+  // Process a control tuple
+  @Override
+  public boolean processControl(ControlTuple userControlTuple)
+  {
+    // process control tuple here
+    return false;
+    // indicates whether or not the engine
+    // should propagate the tuple automatically to downstream operators
+    // Discussed in later sections
+  }
+Note that the pre-defined control tuples like `BEGIN_WINDOW` and `END_WINDOW` would not be
handled by the `processControl()` method since these used only by the engine and are not meant
to be delivered to user logic in operators. Custom control tuples on the other hand are generated
by the operators and need to be delivered to downstream operators.
+#### Return value of `processControl`
+Following are the semantics:
+1. true - Operator would handle propagation explicitly
+2. false - Operator would not handle propagation. Engine will automatically forward.
+See [Propagation of Control Tuples](#propagation-of-control-tuples) for more details
+### Serialization requirements
+A control tuple generated by some operator of the application needs to traverse the same
path as that traversed by other data tuples transmitted by the application. For this reason,
similar to the other data tuples, the control tuple needs to be Kryo serializable since the
default serializer used by the platform is Kryo.
+## Propagation of Control Tuples
+A control tuple emitted by an operator can be propagated downstream automatically. This is
in line with the automatic propagation of other pre-defined control tuples in the engine.
However, some use cases require that the control tuple need not be propagated further in the
DAG. We support this behavior for user defined control tuples.
+Once the control tuple is processed in the `processControl` method, a return value is expected
by the engine. This return value indicates whether or not the operator wishes to handle the
propagation of the control tuple or let the engine proceed with the default auto-propagation
of the control tuple.
+The `processControl` method of the `ControlAwareDefaultInputPort` returns a boolean return
+public boolean processControl(ControlTuple userControlTuple)
+  // process userControlTuple here
+  // return true if operator wants to propagate explicitly or block propagation
+  // return false if operator wants engine to propagate automatically
+### Non - *Control Aware* ports
+For operators without *Control Aware* ports, the platform will forward the control tuples
to the downstream operators automatically. The application writer / user does not have to
worry about handling a Control tuple which is generated upstream. Only operators with *Control
Aware* ports would be delivered the control tuple via the `processControl` method.
+This also allows the existing operators to be backward compatible.
+## Delivery Semantics
+Delivery mechanism refer to the time wrt. the processing window when a control tuple is delivered
to the operator. An operator has various call backs like `setup`, `beginWindow`, `endWindow`,
+### DeliveryType IMMEDIATE
+As the name implies, the control tuple is immediately delivered to the next  downstream operator
(if the operator is control aware), else it is forwarded to the next downstream operator.
+* **Case: Downstream is partitioned**  
+When the downstream is partitioned, the control tuple with *IMMEDIATE* delivery type would
go to all the downstream partitions. This holds, irrespective of whether or not the control
tuple was generated by the immediately upstream operator or even further upstream.
+* **Case: Upstream is partitioned**  
+When the upstream is partitioned and the control tuple is generated in any subset of the
partitions the downstream operator would receive the control tuple immediately and would not
wait till the end of the current window. In case the source for the control tuple was a single
source further upstream and multiple copies were generated by the intermediate partitions,
the duplicate copies of the control tuple would be filtered out at the downstream operator.
Thus only unique control tuples are delivered to the downstream operator. Further, in case
of *IMMEDIATE* delivery, the first instance of the control tuple is delivered to the operator
and the duplicates filtered out.
+### DeliveryType END_WINDOW
+This delivery type only delivers the control tuple to the operator after all data tuples
have been delivered to the operator. In the operator lifecycle, this would mean that the control
tuples would be delivered just before the `endWindow` call.
+* **Case: Downstream is partitioned**  
+  When the downstream is partitioned, the control tuple emitted by the upstream would be
broadcast to downstream operators and buffered in the downstream partitions until the end
of the window and is delivered to the operator just before the `endWindow` call.
+* **Case: Upstream is partitioned**  
+  If the control tuples are generated in any subset of the partitions, then each control
tuple is unique and are delivered to the downstream operator before the `endWindow` call.
However, if the source for the control tuple is a source further upstream, then the downstream
operator would filter out duplicates as and when each control tuple arrive at the operator,
and finally all unique control tuples are delivered to the operator just before the `endWindow`
+## Assumptions
+All the user defined control tuples used in the application are cached in the memory of the
operator for the duration of a window. For this reason, it is imperative that the size as
well as the number of control tuples emitted within a window is small as compared to the number
of data tuples.
+## JIRA
+* [APEXCORE-579](https://issues.apache.org/jira/browse/APEXCORE-579) points to the top level
JIRA issue for control tuple support.

diff --git a/docs/operator_development.md b/docs/operator_development.md
index f3390c9..926fabc 100644
--- a/docs/operator_development.md
+++ b/docs/operator_development.md
@@ -15,6 +15,7 @@ its internals. This document is intended to serve the following purposes
 1.  **[Apache Apex Operators](#apex_operators)** - Introduction to operator terminology
and concepts.
 2.  **[Writing Custom Operators](#writing_custom_operators)** - Designing, coding and testing
new operators from scratch.  Includes code examples.
 3.  **[Operator Reference](#operator_reference)** - Details of operator internals, lifecycle,
and best practices and optimizations.
+4.  **[Advanced Features](#advanced_features)** - Advanced features in operator development
and its capabilities.
 * * * * *
@@ -64,8 +65,8 @@ Types of Operators
 An operator works on one tuple at a time. These tuples may be supplied
 by other operators in the application or by external sources,
 such as a database or a message bus. Similarly, after the tuples are
-processed, these may be passed on to other operators, or stored into an external system.

-Therea are 3 type of operators based on function: 
+processed, these may be passed on to other operators, or stored into an external system.
+There are 3 type of operators based on function:
 1.  **Input Adapter** - This is one of the starting points in
     the application DAG and is responsible for getting tuples from an
@@ -384,9 +385,9 @@ globalCounts = Maps.newHashMap();
 ### Setup call
-The setup method is called only once during an operator lifetime and its purpose is to allow

+The setup method is called only once during an operator lifetime and its purpose is to allow
 the operator to set itself up for processing incoming streams. Transient objects in the operator
-not serialized and checkpointed. Hence, it is essential that such objects initialized in
the setup call. 
+not serialized and checkpointed. Hence, it is essential that such objects initialized in
the setup call.
 In case of operator failure, the operator will be redeployed (most likely on a different
container). The setup method called by the Apache Apex engine allows the operator to prepare
for execution in the new container.
 The following tasks are executed as part of the setup call:
@@ -399,7 +400,7 @@ The following tasks are executed as part of the setup call:
 ### Begin Window call
-The begin window call signals the start of an application window. With 
+The begin window call signals the start of an application window. With
 regards to Word Count Operator, we are expecting updated counts for the most recent window
 data if the sendPerTuple is set to false. Hence, we clear the updatedCounts variable in
the begin window
 call and start accumulating the counts till the end window call.
@@ -448,6 +449,13 @@ ports.
 2. Copy state from checkpoint -- initialized values from step 1 are
+Advanced Features <a name="advanced_features"></a>
+Control Tuple Support
+Operators now also have the capability to emit control tuples. These control tuples are different
from the control tuples used by the engine like BEGIN_WINDOW and END_WINDOW tuples. Operators
can create and emit their own control tuples which can be used to communicate to the down
stream operators regarding some event. Examples of such events can be BEGIN_FILE, or END_FILE.
+More details can be found at [Control Tuples](../control_tuples)
 Malhar Operator Library

diff --git a/mkdocs.yml b/mkdocs.yml
index c10f352..e582ec3 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -9,6 +9,7 @@ pages:
     - Packages: application_packages.md
     - Operators: operator_development.md
     - AutoMetric API: autometrics.md
+    - Custom Control Tuples: control_tuples.md
     - Best Practices: development_best_practices.md 
 - Operations:
     - Apex CLI : apex_cli.md

View raw message