apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ananth (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (APEXMALHAR-2278) Implement Kudu Output Operator for non-transactional streams
Date Sat, 01 Apr 2017 23:22:41 GMT

     [ https://issues.apache.org/jira/browse/APEXMALHAR-2278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Ananth updated APEXMALHAR-2278:
-------------------------------
    Description: 
Here are some benefits of integrating Kudu and Apex:

    Kudu is just declared 1.0 and has just been declared production ready.
    Kudu as a store might a good a fit for many architectures in the years to come because
of its capabilities to provide mutability of data ( unlike HDFS ) and optimized storage formats
for low latency scans.
    It seems to also withstand high-throughput write patterns which makes it a stable sink
for Apex workflows which operate at very high volumes. 


[Design] 

1. The operator would be an AbstractOperator and would allow the concrete implementations
to set a few behavioral aspects of the operator. 

2. The following are the major phases of the operator:

    During activate() phase of the operator : Establish a connection to the cluster and get
the metadata about the table that is being used as the sink.
    During setup() phase of the operator: Fetch the current window information and use it
decide if we are recovering from a failure mode. (See point 8 below )
    During process() of Input port : Inspect the incoming ExecutionContext ( see below ) tuple
and perform one of the  operations ( Insert/Update/Delete/Upsert) 
3. The following parameters are tunable while establishing a Kudu connection:
    Table name, Boss worker threads, Worker threads, Socket read time outs and External Consistency
mode.
4. The user need not specify any schema outright. The pojo fields are automatically mapped
to the table column names as identified in the schema parse in the activate phase. 
5. Allow the concrete implementation of the operator to override the Pojo field name to the
table schema column name. This would allow flexibility in use cases like table schema column
names are not compatible with java bean frameworks or in situations when column names cant
be controlled as it is coming from an upstream operator.
6. The input tuple that is to be supplied to this operator is of type "Kudu Execution Context".
This tuple encompasses the actual Pojo that is going to be persisted to the Kudu store. Additionally
it allows the upstream operator to specify the operation that needs to be performed. One of
the following operations is permitted as part of the context : Insert, Upsert, Update and
delete.
7. The concrete implementation of the operator would allow the user to specify the actual
pojo class definition that would be used to the write to the table. The execution context
would contain this pojo as well as the metadata that defines the behavior of the processing
that needs to be done on that tuple.
8. The operator would allow for a special case of execution mode for the first window that
is being processed as the operator gets activated. There are two modes for the first window
of processing of the operator : 
      a. Safe Mode :  Safe mode is the "happy path execution" as in no extra processing is
required to perform the Kudu mutation.
      b. Reconciling Mode:  There is an additional function that would be called to see if
the user would like the tuple to be used for mutation. This mode is automatically set when
OperatorContext.ACTIVATION_WINDOW_ID != Stateless.WINDOW_ID   during the first window of processing
by the operator. 

This feature is deemed to be useful when an operator is recovering from a crash instance of
the application and we do not want to perform multiple mutations of the same tuple given ATLEAST_ONCE
is the default semantics.  

9. The operator is a stateless operator. 
10. The operator would generate the following autometrics :
      a. Counts  of Inserts, Upserts, Deletes and Updates (separate counters for each mutation)
for a given window
      b. Bytes written in a given window
      c. Write RPCs in the given window
      d. Total RPC errors in this window
       e. All of the above metrics for the operator for its entire lifetime. 



  was:
Here are some benefits of integrating Kudu and Apex:

    Kudu is just declared 1.0 and has just been declared production ready.
    Kudu as a store might a good a fit for many architectures in the years to come because
of its capabilities to provide mutability of data ( unlike HDFS ) and optimized storage formats
for low latency scans.
    It seems to also withstand high-throughput write patterns which makes it a stable sink
for Apex workflows which operate at very high volumes. 


[Design] 

1. The operator would be an AbstractOperator and would allow the concrete implementations
to set a few behavioral aspects of the operator. 

2. The following are the major phases of the operator:

    During activate() phase of the operator : Establish a connection to the cluster and get
the metadata about the table that is being used as the sink.
    During setup() phase of the operator: Fetch the current window information and use it
decide if we are recovering from a failure mode. (See below  point )
    During process() of Input port : Inspect the incoming ExecutionContext ( see below ) tuple
and perform one of the  operations ( Insert/Update/Delete/Upsert) 
3. The following parameters are tunable while establishing a Kudu connection:
    Table name, Boss worker threads, Worker threads, Socket read time outs and External Consistency
mode.
4. The user need not specify any schema outright. The pojo fields are automatically mapped
to the table column names as identified in the schema parse in the activate phase. 
5. Allow the concrete implementation of the operator to override the Pojo field name to the
table schema column name. This would allow flexibility in use cases like table schema column
names are not compatible with java bean frameworks or in situations when column names cant
be controlled as it is coming from an upstream operator.
6. The input tuple that is to be supplied to this operator is of type "Kudu Execution Context".
This tuple encompasses the actual Pojo that is going to be persisted to the Kudu store. Additionally
it allows the upstream operator to specify the operation that needs to be performed. One of
the following operations is permitted as part of the context : Insert, Upsert, Update and
delete.
7. The concrete implementation of the operator would allow the user to specify the actual
pojo class definition that would be used to the write to the table. The execution context
would contain this pojo as well as the metadata that defines the behavior of the processing
that needs to be done on that tuple.
8. The operator would allow for a special case of execution mode for the first window that
is being processed as the operator gets activated. There are two modes for the first window
of processing of the operator : 
      a. Safe Mode :  Safe mode is the "happy path execution" as in no extra processing is
required to perform the Kudu mutation.
      b. Reconciling Mode:  There is an additional function that would be called to see if
the user would like the tuple to be used for mutation. This mode is automatically set when
OperatorContext.ACTIVATION_WINDOW_ID != Stateless.WINDOW_ID   during the first window of processing
by the operator. 

This feature is deemed to be useful when an operator is recovering from a crash instance of
the application and we do not want to perform multiple mutations of the same tuple given ATLEAST_ONCE
is the default semantics.  

9. The operator is a stateless operator. 
10. The operator would generate the following autometrics :
      a. Counts  of Inserts, Upserts, Deletes and Updates (separate counters for each mutation)
for a given window
      b. Bytes written in a given window
      c. Write RPCs in the given window
      d. Total RPC errors in this window
       e. All of the above metrics for the operator for its entire lifetime. 




> Implement Kudu Output Operator for non-transactional streams
> ------------------------------------------------------------
>
>                 Key: APEXMALHAR-2278
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2278
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>          Components: adapters database
>            Reporter: Ananth
>            Assignee: Ananth
>
> Here are some benefits of integrating Kudu and Apex:
>     Kudu is just declared 1.0 and has just been declared production ready.
>     Kudu as a store might a good a fit for many architectures in the years to come because
of its capabilities to provide mutability of data ( unlike HDFS ) and optimized storage formats
for low latency scans.
>     It seems to also withstand high-throughput write patterns which makes it a stable
sink for Apex workflows which operate at very high volumes. 
> [Design] 
> 1. The operator would be an AbstractOperator and would allow the concrete implementations
to set a few behavioral aspects of the operator. 
> 2. The following are the major phases of the operator:
>     During activate() phase of the operator : Establish a connection to the cluster and
get the metadata about the table that is being used as the sink.
>     During setup() phase of the operator: Fetch the current window information and use
it decide if we are recovering from a failure mode. (See point 8 below )
>     During process() of Input port : Inspect the incoming ExecutionContext ( see below
) tuple and perform one of the  operations ( Insert/Update/Delete/Upsert) 
> 3. The following parameters are tunable while establishing a Kudu connection:
>     Table name, Boss worker threads, Worker threads, Socket read time outs and External
Consistency mode.
> 4. The user need not specify any schema outright. The pojo fields are automatically mapped
to the table column names as identified in the schema parse in the activate phase. 
> 5. Allow the concrete implementation of the operator to override the Pojo field name
to the table schema column name. This would allow flexibility in use cases like table schema
column names are not compatible with java bean frameworks or in situations when column names
cant be controlled as it is coming from an upstream operator.
> 6. The input tuple that is to be supplied to this operator is of type "Kudu Execution
Context". This tuple encompasses the actual Pojo that is going to be persisted to the Kudu
store. Additionally it allows the upstream operator to specify the operation that needs to
be performed. One of the following operations is permitted as part of the context : Insert,
Upsert, Update and delete.
> 7. The concrete implementation of the operator would allow the user to specify the actual
pojo class definition that would be used to the write to the table. The execution context
would contain this pojo as well as the metadata that defines the behavior of the processing
that needs to be done on that tuple.
> 8. The operator would allow for a special case of execution mode for the first window
that is being processed as the operator gets activated. There are two modes for the first
window of processing of the operator : 
>       a. Safe Mode :  Safe mode is the "happy path execution" as in no extra processing
is required to perform the Kudu mutation.
>       b. Reconciling Mode:  There is an additional function that would be called to see
if the user would like the tuple to be used for mutation. This mode is automatically set when
OperatorContext.ACTIVATION_WINDOW_ID != Stateless.WINDOW_ID   during the first window of processing
by the operator. 
> This feature is deemed to be useful when an operator is recovering from a crash instance
of the application and we do not want to perform multiple mutations of the same tuple given
ATLEAST_ONCE is the default semantics.  
> 9. The operator is a stateless operator. 
> 10. The operator would generate the following autometrics :
>       a. Counts  of Inserts, Upserts, Deletes and Updates (separate counters for each
mutation) for a given window
>       b. Bytes written in a given window
>       c. Write RPCs in the given window
>       d. Total RPC errors in this window
>        e. All of the above metrics for the operator for its entire lifetime. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message