apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ananth (JIRA)" <j...@apache.org>
Subject [jira] [Closed] (APEXMALHAR-2278) Implement Kudu Output Operator for non-transactional streams
Date Sat, 11 Nov 2017 11:26:09 GMT

     [ https://issues.apache.org/jira/browse/APEXMALHAR-2278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

Ananth closed APEXMALHAR-2278.

> Implement Kudu Output Operator for non-transactional streams
> ------------------------------------------------------------
>                 Key: APEXMALHAR-2278
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2278
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>          Components: adapters database
>            Reporter: Ananth
>            Assignee: Ananth
>             Fix For: 3.8.0
> Here are some benefits of integrating Kudu and Apex:
>     Kudu is just declared 1.0 and has just been declared production ready.
>     Kudu as a store might a good a fit for many architectures in the years to come because
of its capabilities to provide mutability of data ( unlike HDFS ) and optimized storage formats
for low latency scans.
>     It seems to also withstand high-throughput write patterns which makes it a stable
sink for Apex workflows which operate at very high volumes. 
> [Design] 
> 1. The operator would be an AbstractOperator and would allow the concrete implementations
to set a few behavioral aspects of the operator. 
> 2. The following are the major phases of the operator:
>     During activate() phase of the operator : Establish a connection to the cluster and
get the metadata about the table that is being used as the sink.
>     During setup() phase of the operator: Fetch the current window information and use
it decide if we are recovering from a failure mode. (See point 8 below )
>     During process() of Input port : Inspect the incoming ExecutionContext ( see below
) tuple and perform one of the  operations ( Insert/Update/Delete/Upsert) 
> 3. The following parameters are tunable while establishing a Kudu connection:
>     Table name, Boss worker threads, Worker threads, Socket read time outs and External
Consistency mode.
> 4. The user need not specify any schema outright. The pojo fields are automatically mapped
to the table column names as identified in the schema parse in the activate phase. 
> 5. Allow the concrete implementation of the operator to override the Pojo field name
to the table schema column name. This would allow flexibility in use cases like table schema
column names are not compatible with java bean frameworks or in situations when column names
cant be controlled as POJO is coming from an upstream operator.
> 6. The input tuple that is to be supplied to this operator is of type "Kudu Execution
Context". This tuple encompasses the actual Pojo that is going to be persisted to the Kudu
store. Additionally it allows the upstream operator to specify the operation that needs to
be performed. One of the following operations is permitted as part of the context : Insert,
Upsert, Update and delete on the Pojo that is acting as the payload in the Execution Context.
> 7. The concrete implementation of the operator would allow the user to specify the actual
POJO class definition that would be used to the write to the table. The execution context
would contain this POJO as well as the metadata that defines the behavior of the processing
that needs to be done on that tuple.
> 8. The operator would allow for a special case of execution mode for the first window
that is being processed as the operator gets activated. There are two modes for the first
window of processing of the operator : 
>       a. Safe Mode :  Safe mode is the "happy path execution" as in no extra processing
is required to perform the Kudu mutation.
>       b. Reconciling Mode:  There is an additional function that would be called to see
if the user would like the tuple to be used for mutation. This mode is automatically set when
OperatorContext.ACTIVATION_WINDOW_ID != Stateless.WINDOW_ID   during the first window of processing
by the operator. 
> This feature is deemed to be useful when an operator is recovering from a crash instance
of the application and we do not want to perform multiple mutations of the same tuple given
ATLEAST_ONCE is the default semantics.  
> 9. The operator is a stateless operator. 
> 10. The operator would generate the following autometrics :
>       a. Counts  of Inserts, Upserts, Deletes and Updates (separate counters for each
mutation) for a given window
>       b. Bytes written in a given window
>       c. Write RPCs in the given window
>       d. Total RPC errors in this window
>        e. All of the above metrics for the operator for its entire lifetime of the operator.

This message was sent by Atlassian JIRA

View raw message