nifi-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <>
Subject [GitHub] [nifi] gerdansantos commented on issue #4065: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) in PostgreSQL tables via Logical Replication.
Date Tue, 25 Feb 2020 02:34:45 GMT
gerdansantos commented on issue #4065: NIFI-4239 - Adding CaptureChangePostgreSQL processor
to capture data changes (INSERT/UPDATE/DELETE) in PostgreSQL tables via Logical Replication.
   > This is going to be a great new feature! I left some initial comments, and I haven't
been able to get it to work yet on my PostgreSQL 12 server. Is there some n00b tutorial you
can point me to so I can make sure I set everything up right? I created a publication and
a replication slot, but I don't know how/if they work together, and I didn't get any events
into the processor.
   Configuration on PostgreSQL Server
   Your database should be configured to enable logical replication.
   * Property **max_wal_senders** should be at least equal to the number of replication consumers.
   * Property **wal_keep_segments** should contain count wal segments that can't be removed
from database.
   * Property **wal_level** for logical replication should be equal to logical.
   * Property **max_replication_slots** should be greater than zero for logical replication,
because logical replication can't work without replication slot.
   listen_addresses = '*'         	# what IP address(es) to listen on;
                                   # comma-separated list of addresses;
                                   # defaults to 'localhost'; use '*' for all
                                   # (change requires restart)
   max_wal_senders = 4             # max number of walsender processes
   wal_keep_segments = 4           # in logfile segments, 16MB each; 0 disables
   wal_level = logical             # minimal, replica, or logical
   max_replication_slots = 4       # max number of replication slots
   After configurations, restart/reload PostgreSQL service.
   Enable connect user with replication privileges to replication stream and make a statments
on PostgreSQL Server.
   Example, Put this lines at pg_hba.conf:
   host    all           all              all                     md5 #For SnapShot Connection
   host    replication   all              all                     md5 #For Replication Connection
   After configurations, restart PostgreSQL service.
   Example of Use
   Create a PostgreSQL table, for this example we use a postgres database; JUST FOR LEARNING
   Table struct:
                   Table "public.cidade"
     Column   |  Type   | Collation | Nullable | Default
    codigo    | integer |           | not null |
    data_fund | date    |           | not null |
    nome      | text    |           |          |
   CREATE TABLE cidade(codigo integer not null, data_fund date not null, nome text);
   **OBS**: A published table must have a “replica identity” configured in order to be
able to replicate UPDATE and DELETE operations, so that appropriate rows to update or delete
can be identified on the subscriber side. By default, this is the primary key, if there is
one. Another unique index (with certain additional requirements) can also be set to be the
replica identity. If the table does not have any suitable key, then it can be set to replica
identity “full”, which means the entire row becomes the key. This, however, is very inefficient
and should only be used as a fallback if no other solution is possible. More details:
   First, you need a Publication for the tables that you want to capture data changes:
   CREATE PUBLICATION cidade_pub FOR TABLE cidade;
   You can decide the NiFi CDC PostgreSQL processor create a slot for you...
   Or you can create manually.
   SELECT * FROM pg_create_logical_replication_slot('slt_cidade_pub', 'pgoutput');
   Link NiFi CDC PostgreSQL processor to this PostgreSQL Instance:
   PostgreSQL Host:
   PostgreSQL Driver Class Name: org.postgresql.Driver
   PostgreSQL Driver Location(s): /nifi-1.11.0/jdbc/postgresql-42.2.9.jar
   Database: postgres
   Username: postgres
   Publication: cidade_pub
   Slot Name: slt_cidade_pub
   Make Snapshot: false
   Include Begin/Commit Events: false
   Initial Log Sequence Number - LSN: [empty property]
   Drop If Exists Replication Slot: false
   Link to LogAttribute for example, so start CaptureChangePostgreSQL
   So execute at PostgreSQL Server
   INSERT INTO cidade (codigo, data_fund, nome) VALUES (4, now(), 'New York');
   UPDATE cidade SET codigo = 20 WHERE codigo = 4;
   UPDATE cidade SET nome = 'Statue of Liberty City' WHERE nome = 'New York';
   DELETE FROM cidade WHERE codigo >= 4;
   Output at NiFi Queue:
   {"insert":{"cidade":{"codigo":4,"nome":"New York","data_fund":"2020-02-24"}}}
   {"update":{"cidade":{"codigo":20,"nome":"New York","data_fund":"2020-02-24"}}}
   {"update":{"cidade":{"codigo":20,"nome":"Statue of Liberty City","data_fund":"2020-02-24"}}}
   {"delete":{"cidade":{"codigo":20,"nome":"Statue of Liberty City","data_fund":"2020-02-24"}}}

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

View raw message