apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ananth (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2472) Implement Kudu Input Operator
Date Wed, 16 Aug 2017 01:04:00 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16128171#comment-16128171

Ananth commented on APEXMALHAR-2472:

The following are the main features that would be supported by the Input operator:

- The input operator would be used to scan all or some rows of a single kudu table.
- Each Kudu row is translated to a POJO for downstream operators. 
- The Input operator would accept an SQL expression ( described in detail below) that would
be parsed to generate the equivalent scanner code for the Kudu Table. This is because Kudu
Table API does not support an SQL expressions 
- The SQL expression would have additional options that would help in Apache Apex design patterns
( Ex: Sending a control tuple message after a query is successfully processed )
- The Input operator works on a continuous basis i.e. it would accept the next query once
the current query is complete)
- The operator will work in a distributed fashion for the input query. This essentially means
for a single input query, the scan work is distributed among all of the physical instances
of the input operator.
- Kudu splits a table into chunks of data regions called Tablets. The tablets are replicated
and partitioned  (range and hash partitions are supported ) in Kudu according to the Kudu
Table definition. The operator allows partitioning of the Input Operator to be done in 2 ways.

	- Map many Kudu Tablets to one partition of the Apex Kudu Input operator
        - One Kudu Tablet maps to one partition of the Apex Kudu Input operator
- The partitioning does not change on a per query basis. This is because of the complex use
cases that would arise. For example, if the query is touching only a few rows before the next
query is accepted, it would result in a lot of churn in terms of operator serialize/deserialze,
YARN allocation requests etc. Also supporting per query partition planning leads to possibly
very complex implementation and poor resource usage as all physical instances of the operator
have to wait for its peers to complete its scan and wait for next checkpoint to get repartitioned.
- The partitioner splits the work load of a single query in a round robin fashion. After a
query plan is generated , each scan token range is distributed equally among the physical
operator instances.
- The operator allows for two modes of scanning for an application ( Cannot be changed on
a per query basis ) 
	- Consistent Order scanner - only one tablet scan thread is active at any given instance
of time for a given query
        - Random Order scanner - Many threads are active to scan Kudu tablets in parallel
- As can be seen, Consistent order scanner would be slower but would help in better “exactly
once” implementations if the correct method is overridden in the operator.
- The operator introduces the DisruptorBlockingQueue for a low latency buffer management.
LMAX disruptor library was considered and based on some other discussion threads on other
Apache projects, settled on the ConversantMedia implementation of the Disruptor Blocking queue.
This blocking queue is used when the kudu scanner thread wants to send the scanned row into
the input operators main thread emitTuples() call.
- The operator allows for exactly once semantics if the user specifies the logic for reconciling
a possible duplicate row in situations when the operator is resuming from a checkpoint. This
is done by overriding a method that returns a boolean ( true to emit the tuple and false to
suppress the tuple ) when the operating is working in the reconciling window phase. As can
be seen, this reconciling phase is only active at the max for one window.
- The operator uses the FSWindowManager to manage metadata at the end of every window. From
resumption at a checkpoint, the operator will still scan the Kudu tablets but simply not emit
all rows that were already streamed downstream. Subsequently when the operator is in the reconciling
window, the method described above is invoked to allow for duplicates filter. After this reconciling
window, the operator works in the normal mode of operation.
- The following are the additional configurable aspects of the operator
	- Max tuples per window
        - Spin policy and the buffer size for the Disruptor Blocking Queue
        - Mechanism to provide custom control tuples if required
	- Setting the number of Physical operator instances via the API if required. 
        - Setting the fault Tolerance. If fault tolerant , an alternative replica of the Kudu
tablet is picked up for scanning if the initial tablet fails for whatever reason. However
this slows down the scan throughput. Hence it is configurable by the end user.

Some notes regarding the SQL expression for the operator:

- The operator uses ANTLR4 to parse the SQL expression.
- The parser is based on a grammar file which is part of the source tree. The grammar is compiled
on every build as part of the build process and code is generated for the parser automatically.
- The reason we had to use a custom parser are (as opposed to something like calcite) :
	- Kudu does not have all the features for a standard SQL expression. As an example != ( not
equal to ) is not supported. Nor is there a concept of a Join etc.
	- We are providing a lot more flexibility for the user to specify what the control tuple
message should be should the end user choose to send a control tuple downstream after the
given query is done processing
- The SQL expression can specify a set of options for processing of the query:
	- Control tuple message : A message/string that can be sent as the Control tuple field. There
would be other parts for this control tuple like the query that was just completed and whether
this is a begin or end of the scan.
        - Read Snapshot time : Kudu supports specifying the read snapshot time for which the
scan has to occur. This is because Kudu is essentially an MVCC engine and stores multiple
versions of the same row. The Read snapshot time allows for the end user to specify the read
snapshot time for the scan. 
- The parser supports for general syntax checking. If there is an error in the SQL expression
, the string representing the SQL expression supplied is emitted onto an error port and the
next query is taken for processing.
- The data types supported are only those data types as supported by the Kudu Engine. The
parser supports data type parsing support. For example String data types are double quoted
- The Parser allows for a SELECT AA as BB style of expressions wherein AA is the column name
in Kudu and BB is the name of the java POJO field name.

> Implement Kudu Input Operator 
> ------------------------------
>                 Key: APEXMALHAR-2472
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2472
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>          Components: adapters database
>            Reporter: Ananth
>            Assignee: Ananth
> This operator would allow Kudu to be used as an Input store. This has multiple advantages
like : 
> - Ability to solve the dedup problem from entire data set perspective. The dedupe operators
we have today are primarily window based and this might not meet all of the use cases in real
> - Ability to selectively stream data based on a SQL expression. Since Kudu is a structural
store, we could effectively allow a SQL expression based "input" definition that would allow
for selective streaming for all downstream operators. This could potentially be an alternative
streaming store pattern as compared to Kafka as Kafka does not allow for selective streaming
of tuples. 

This message was sent by Atlassian JIRA

View raw message