apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rama...@apache.org
Subject [1/2] apex-malhar git commit: APEXMALHAR-2383 Documentation for Jdbc Output Operator
Date Wed, 22 Mar 2017 19:40:49 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master 02a441d57 -> 9fd29ca27


APEXMALHAR-2383 Documentation for Jdbc Output Operator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/80e6a084
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/80e6a084
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/80e6a084

Branch: refs/heads/master
Commit: 80e6a0846fdc437d995e85395b4567de6556d32e
Parents: cb1ef76
Author: Hitesh-Scorpio <forhiteshjob@gmail.com>
Authored: Mon Feb 13 16:53:50 2017 +0530
Committer: Hitesh-Scorpio <forhiteshjob@gmail.com>
Committed: Wed Mar 22 15:33:49 2017 +0530

----------------------------------------------------------------------
 ...AbstractJdbcTransactionableOutputOperator.md | 178 +++++++++++++++++++
 .../jdbcoutput/operatorsClassDiagrams.png       | Bin 0 -> 136942 bytes
 mkdocs.yml                                      |   1 +
 3 files changed, 179 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/80e6a084/docs/operators/AbstractJdbcTransactionableOutputOperator.md
----------------------------------------------------------------------
diff --git a/docs/operators/AbstractJdbcTransactionableOutputOperator.md b/docs/operators/AbstractJdbcTransactionableOutputOperator.md
new file mode 100644
index 0000000..522fb98
--- /dev/null
+++ b/docs/operators/AbstractJdbcTransactionableOutputOperator.md
@@ -0,0 +1,178 @@
+JDBC Transactional POJO Output Operator
+==============
+
+## Operator Objective
+This operator receives an input stream of POJOs and inserts them as rows in a database table
in a fault-tolerant way.
+
+## Overview
+The main features of this operator (`AbstractJdbcTransactionableOutputOperator`) are persisting
data to the database table and fault tolerance. This operator creates a transaction at the
start of each window, executes batches of SQL updates, and closes the transaction at the end
of the window. Each tuple corresponds to an SQL update statement. The operator groups the
updates in a batch and submits them with one call to the database. Batch processing improves
performance considerably. The size of a batch is configured by `batchSize` property. The tuples
in a window are stored in a check-pointed collection which is cleared in each `endWindow()`
call. The operator writes a tuple exactly once in the database.
+
+An (indirect) base class for this operator is `AbstractPassThruTransactionableStoreOutputOperator`
which implements a pass-through output adapter for a transactional store; it guarantees exactly-once
semantics. "Pass-through" means it does not wait for end window to write to the store. It
will begin transaction at `beginWindow` and write to the store as the tuples come and commit
the transaction at `endWindow`.
+
+The overall heirarchy is described in the the following diagram:
+
+![JdbcPOJOInsertOutputOperator.png](images/jdbcoutput/operatorsClassDiagrams.png)
+
+`AbstractTransactionableStoreOutputOperator`: A skeleton implementation of an output operator
that writes to a transactional store; the tuple type and store type are generic parameters.
Defines an input port whose process method invokes the processTuple() abstract method. Exactly-once
semantics are not guaranteed and must be provided by subclasses if needed.
+
+`AbstractPassThruTransactionableStoreOutputOperator`: Simple extension of the above base
class which adds exactly-once semantics by starting a transaction in `beginWindow()` and committing
it in `endWindow()`.
+
+`AbstractJdbcTransactionableOutputOperator`: (focus of this document) Adds support for JDBC
by using an instance of JdbcTransactionalStore as the store. Also adds support for processing
tuples in batches and provides an implementation of the `proessTuple()` abstract method mentioned
above.
+
+`AbstractJdbcPOJOOutputOperator`: Serves as base class for inserting rows in a table using
a JDBC store.
+
+**Note**: For enforcing exactly once semantics a table named `dt_meta` must exist in the
database. The sample SQL to create the same is as follows
+```
+"CREATE TABLE IF NOT EXISTS dt_meta (dt_app_id VARCHAR(100) NOT NULL, dt_operator_id INT
NOT NULL, dt_window BIGINT NOT NULL, UNIQUE(dt_app_id,dt_operator_id,dt_window))".
+```
+**Note**: Additionally this operator assumes that the underlying database/table in which
records are to be added supports transactions. If the database/table does not support transactions
then a tuple may be inserted in a table more than once in case of auto recovery from a failure
(violation of exactly once semantics).
+
+## Operator Information
+1. Operator location: ***malhar-library***
+2. Available since: ***0.9.4***
+3. Java Packages:
+    * Operator: ***[com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator](https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.html)***
+
+## How to Use?
+ Concrete subclasses need to implement a couple of abstract methods (if not using AbstractJdbcPOJOOutputOperator):
`setStatementParameters(PreparedStatement statement, T tuple)` to set the parameter of the
insert/update statement (which is a PreparedStatement) with values from the tuple and `getUpdateCommand()`
to return the SQL statement to update a tuple in the database.  Note that subclasses of AbstractJdbcPOJOOutputOperator
need not define these methods since they are already defined in that class.
+
+Several properties are available to configure the behavior of this operator and they are
summarized in the table below.
+### <a name="AbstractJdbcTransactionableOutputOperatorProps"></a>Properties of
AbstractJdbcTransactionableOutputOperator
+| **Property** | **Description** | **Type** | **Mandatory** | **Default Value** |
+| -------- | ----------- | ---- | ------------------ | ------------- |
+| *batchSize* | Maximum number of tuples to insert in a single call (see explanation above).
| int | No | 1000 |
+
+#### <a name="JdbcTransactionalStore"></a>Properties of JDBC Store
+| **Property** | **Description** | **Type** | **Mandatory** | **Default Value** |
+| -------- | ----------- | ---- | ------------------ | ------------- |
+| *databaseDriver* |JDBC Driver class for connection to JDBC Store. This driver should be
present in the class path | String | Yes | N/A |
+| *databaseUrl* |["Database URL"](http://www.roseindia.net/tutorial/java/jdbc/databaseurl.html)
of the form jdbc:subprotocol:subname | String | Yes | N/A |
+| *userName* | Name of the user configured in the database | String | Yes | N/A |
+| *password* | Password of the user configured in the database | String | Yes | N/A |
+
+Those attributes can be set like this:
+
+```xml
+<property>
+  <name>dt.operator.{OperatorName}.prop.batchSize</name>
+  <value>500</value>
+</property>
+
+<property>
+  <name>dt.operator.{OperatorName}.prop.store.databaseDriver</name>
+  <value>com.mysql.jdbc.Driver</value>
+</property>
+
+<property>
+  <name>dt.operator.{OperatorName}.prop.store.databaseUrl</name>
+  <value>jdbc:mysql://localhost:3306/mydb</value>
+</property>
+
+<property>
+  <name>dt.operator.{OperatorName}.prop.store.userName</name>
+  <value>myuser</value>
+</property>
+
+<property>
+  <name>dt.operator.{OperatorName}.prop.store.password</name>
+  <value>mypassword</value>
+</property>
+```
+### Abstract Methods
+These methods are defined as abstract in AbstractJdbcTransactionableOutputOperator `void
setStatementParameters(PreparedStatement statement, T tuple)`:Sets the parameter of the insert/update
statement with values from the tuple.
+`String getUpdateCommand()`:Gets the statement which insert/update the table in the database.
+
+
+## AbstractJdbcPOJOOutputOperator
+This is the abstract implementation extending the functionality of AbstractJdbcTransactionableOutputOperator
that serves as base class for inserting rows in a table using a JDBC store. It has the definition
for the abstract methods in AbstractJdbcTransactionableOutputOperator. It can be further extended
to modify functionality or add new capabilities. This class has an input port to recieve the
records in the form of tuples, so concrete subclasses won't need to provide the same, and
processes/inserts each input tuple as a database table record. You need to set the input port
attribute TUPLE_CLASS to define your [POJO](https://en.wikipedia.org/wiki/Plain_Old_Java_Object)
class name to define Object type.
+
+### <a name="AbstractJdbcPOJOOutputOperatorProps"></a>Properties of AbstractJdbcPOJOOutputOperator
+Several properties are available to configure the behavior of this operator and they are
summarized in the table below.
+
+| **Property** | **Description** | **Type** | **Mandatory** | **Default Value** |
+| -------- | ----------- | ---- | ------------------ | ------------- |
+| *tablename* | Name of the table where data is to be inserted | String | Yes | N/A |
+| *fieldInfos*| JdbcFieldInfo maps a store column to a POJO field name| List | Yes | N/A
|
+
+Those attributes can be set like this:
+
+```xml
+<property>
+  <name>dt.operator.{OperatorName}.prop.tablename</name>
+  <value>ResultTable</value>
+</property>
+
+<property>
+  <name>dt.operator.{OperatorName}.fieldInfosItem[0]</name>
+  <value>
+  {
+    "sqlType": 0,
+    "columnName":"ID",
+    "pojoFieldExpression": "id",
+    "type":"INTEGER"
+  }
+  </value>
+</property>
+
+<property>
+  <name>dt.operator.{OperatorName}.fieldInfosItem[1]</name>
+  <value>
+  {
+    "sqlType": 4,
+    "columnName":"NAME",
+    "pojoFieldExpression": "name",
+    "type":"STRING"
+  }
+  </value>
+</property>
+```
+
+
+## Platform Attributes that influence operator behavior
+
+| **Attribute** | **Description** | **Type** | **Mandatory** |
+| -------- | ----------- | ---- | ------------------ |
+| *TUPLE_CLASS* | TUPLE_CLASS attribute on input port which tells operator the class of POJO
which is being received  | Class| Yes |
+
+Those attributes can be set like this:
+
+```xml
+<property>
+  <name>dt.operator.{OperatorName}.port.input.attr.TUPLE_CLASS</name>    
+  <value>com.example.mydtapp.PojoEvent</value>
+</property>
+```
+
+A concrete implementation is provided in Malhar as [JdbcPOJOInsertOutputOperator](https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOInsertOutputOperator.java).
 The incoming tuples will be inserted in the table using PreparedStatement of the base class,
 which is formed in `activate()` method of this operator.
+
+## Features
+The operator is **idempotent**, **fault-tolerant** and **statically partitionable**.
+
+## Partitioning of JDBC Output Operator
+#### Static Partitioning
+Only static partitioning is supported for this operator.
+
+Static partitioning can be achieved by specifying the partitioner and number of partitions
in the populateDAG() method
+```java
+  JdbcPOJOInsertOutputOperator jdbcPOJOInsertOutputOperator = dag.addOperator("jdbcPOJOInsertOutputOperator",
JdbcPOJOInsertOutputOperator.class);
+  StatelessPartitioner<JdbcPOJOInsertOutputOperator> partitioner1 = new StatelessPartitioner<JdbcPOJOInsertOutputOperator>(2);
+  dag.setAttribute(jdbcPOJOInsertOutputOperator, Context.OperatorContext.PARTITIONER, partitioner1);
+```
+
+Static partitioning can also be achieved by specifying the partitioner in properties file.
+```xml
+  <property>
+    <name>dt.operator.{OperatorName}.attr.PARTITIONER</name>
+    <value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value>
+  </property>
+```
+
+where {OperatorName} is the name of the JdbcPOJOInsertOutputOperator operator.
+Above lines will partition JdbcPOJOInsertOutputOperator statically 2 times. Above value can
be changed accordingly to change the number of static partitions.
+
+
+#### Dynamic Partitioning
+Not supported.
+
+## Example
+An example application using this operator can be found [here](https://github.com/DataTorrent/examples/tree/master/tutorials/fileToJdbc).
This example shows how to read files from HDFS, parse into POJOs and then insert into a table
in MySQL.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/80e6a084/docs/operators/images/jdbcoutput/operatorsClassDiagrams.png
----------------------------------------------------------------------
diff --git a/docs/operators/images/jdbcoutput/operatorsClassDiagrams.png b/docs/operators/images/jdbcoutput/operatorsClassDiagrams.png
new file mode 100644
index 0000000..ae7ab42
Binary files /dev/null and b/docs/operators/images/jdbcoutput/operatorsClassDiagrams.png differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/80e6a084/mkdocs.yml
----------------------------------------------------------------------
diff --git a/mkdocs.yml b/mkdocs.yml
index 643289c..28ea645 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -15,6 +15,7 @@ pages:
     - File Output: operators/file_output.md
     - File Splitter: operators/file_splitter.md
     - Filter: operators/filter.md
+    - Jdbc Output Operator: operators/AbstractJdbcTransactionableOutputOperator.md
     - JDBC Poller Input: operators/jdbcPollInputOperator.md
     - JMS Input: operators/jmsInputOperator.md
     - JSON Formatter: operators/jsonFormatter.md


Mime
View raw message