Tạo connector: Type là source, Database là PostgreSQL
Pre-condition: Status CDC service Healthy.
1. pgoutput cần thay đổi cấu hình wal_level của Postgres cluster thành logical đồng thời cần thực hiện CDC trên primary, thay vì hot hoặc warm* replicas.
SHOW wal_level;
ALTER SYSTEM SET wal_level = 'logical';
2. PostgreSQL source connector yêu cầu tối thiểu REPLICATION role.
SELECT rolsuper FROM pg_roles WHERE rolname = '<USER_NAME>';
CREATE USER <USER_NAME> WITH REPLICATION LOGIN PASSWORD '<PASSWORD>';
Tạo Publication:
, FPTCloud chỉ chấp nhận chuỗi kí tự chỉ chứa chữ cái in thường.CREATE PUBLICATION <PUBLICATION_NAME> FOR ALL TABLES;
SELECT * FROM pg_publication;
CREATE PUBLICATION <PUBLICATION_NAME> FOR TABLE <SCHEMA1>.<TABLE1>, <SCHEMA2>.<TABLE2>, ...;
ALTER PUBLICATION <PUBLICATION_NAME> ADD TABLE <SCHEMA1>.<TABLE1>, <SCHEMA2>.<TABLE2>, ...;
ALTER PUBLICATION <PUBLICATION_NAME> DROP TABLE <SCHEMA1>.<TABLE1>, <SCHEMA2>.<TABLE2>, ...;
DROP PUBLICATION <PUBLICATION_NAME>;
Thêm quyền SELECT trên các bảng cho user đang được sử dụng:
GRANT SELECT ON TABLE '<SCHEMA_NAME>.<TABLE_NAME>' TO <USER_NAME>;
DO $$
DECLARE
table_record RECORD;
BEGIN
FOR table_record IN
SELECT table_name
FROM information_schema.tables
WHERE table_schema = '<SCHEMA_NAME>' AND table_type = 'BASE TABLE'
LOOP
EXECUTE 'GRANT SELECT ON TABLE <SCHEMA_NAME>."' || table_record.table_name || '" TO <USER_NAME>;';
END LOOP;
END $$;
Thay đổi REPLICA IDENTITY level của các bảng cần Capture Data Change.
ALTER TABLE your_schema_name.your_table_name REPLICA IDENTITY FULL;
DO $$
DECLARE
table_record RECORD;
BEGIN
FOR table_record IN
SELECT table_name
FROM information_schema.tables
WHERE table_schema = '<SCHEMA_NAME>' AND table_type = 'BASE TABLE'
LOOP
EXECUTE 'ALTER TABLE <SCHEMA_NAME>."' || table_record.table_name || '" REPLICA IDENTITY FULL;';
END LOOP;
END $$;
Connector sẽ tự tạo ra hoặc dùng lại một replication_slot đã có sẵn với giá trị slot.name nhập từ giao diện, để lắng nghe thay đổi từ wal_log (write-ahead log).
show max_replication_slots;
SELECT slot_name, plugin, slot_type, database, active FROM pg_replication_slots;
SELECT pg_drop_replication_slot('<REPLICATION_SLOT_NAME>');
Khi xóa bỏ một connector, cần loại bỏ replication_slot và publication của nó:
SELECT pg_drop_replication_slot('<REPLICATION_SLOT_NAME>');
DROP PUBLICATION <PUBLICATION_NAME>;
Trong trường hợp muốn thay đổi cấu hình max_replication_slots, vui lòng thay đổi cấu hình này trong file postgres.conf.
Để tạo connector, người dùng thực hiện các bước sau:
Bước 1: Tại thanh menu, chọn Data Platform > Workspace Management > Workspace name.
Bước 2: Tại phần My services chọn CDC service
Bước 3. Tại màn detail CDC service, chọn tab Connectors và nhấn Create a connector.
Bước 4 Nhập thông tin màn Connector Information:
Name (required): Tên connector. Chú ý: Tên connector có thể chứa các kí tự chữ cái thường a-z hoặc các kí tự số 0-9. Đặc biệt không dùng dấu cách có thể thay dấu cách bằng dấu “-”.
Type (required): Chọn source.
Database (required): Chọn PostgreSQL.
Bước 5: Nhấn Next để chuyển qua màn Properties và nhập các thông tin sau:
Trường hợp chọn From FPT Database Engine: - điền các thông tin sau:
Database (required): Chọn Database.
Host Name (required): Hostname hoặc IP của Postgres server.
Port (required): Postgres server port, mặc định là 5432.
Database name (required): Database mà Connector sẽ lắng nghe thay đổi dữ liệu.
Username (required): Postgres user sử dụng bởi Connector.
Password (required): Mật khẩu.
Topic prefix (required): Khi dữ liệu thay đổi, các sự kiện thay đổi sẽ được produce vào các Kafka topics, tên của các topics sẽ có dạng [topic.prefix].[tên_schema].[tên_bảng]
Ví dụ: tiền tố topic: syncdata, schema inventory, các bảng: customer, order, item. Connector sẽ thực hiện ghi lại các thay đổi của dữ liệu vào các topic của Kafka: syncdata.inventory.customer, syncdata.inventory.order, syncdata.inventory.item)
Slot (required): Replication slot sử dụng bởi connector, giá trị chỉ nhận chữ cái và in thường.
Publication (required): Publication sử dụng bởi connector, giá trị chỉ nhận chữ cái và in thường.
Trường hợp chọn Manual configuration - điền các thông tin sau:
Host Name (required): Hostname hoặc IP của Postgres server
Port (required): Postgres server port, mặc định là 5432
Database name (required): Database mà Connector sẽ lắng nghe thay đổi dữ liệu
Username (required): Postgres user sử dụng bởi Connector
Password (required): mật khẩu
Topic prefix (required): Khi dữ liệu thay đổi, các sự kiện thay đổi sẽ được produce vào các Kafka topics, tên của các topics sẽ có dạng [topic.prefix].[tên_schema].[tên_bảng] Ví dụ: tiền tố topic: syncdata, schema inventory, các bảng: customer, order, item. Connector sẽ thực hiện ghi lại các thay đổi của dữ liệu vào các topic của Kafka: syncdata.inventory.customer, syncdata.inventory.order, syncdata.inventory.item)
Slot (required): Replication slot sử dụng bởi connector, giá trị chỉ nhận chữ cái và in thường
Publication (required): Publication sử dụng bởi connector, giá trị chỉ nhận chữ cái và in thường
Nhấn Test connection để kiểm tra kết nối từ Workspace tới Database đã nhập
Bước 6: Nhấn Next để chuyển qua màn Additional Properties và nhập các thông tin:
Mode (required): Hành vi của Connector. Chọn các loại mode sau
Initial (default): Connector sẽ snapshot toàn bộ dữ liệu đã tồn tại trong các bảng, sau đó tiếp tục capture data changes trên các bảng này
Initial_only: Connector sẽ chỉ snapshot toàn bộ dữ liệu đã tồn tại trong các bảng, sau đó không lắng nghe các sự kiện thay đổi dữ liệu trên bảng
No_data: Connector sẽ không snapshot dữ liệu đã tồn tại trong bảng mà chỉ lắng nghe các sự kiện thay đổi dữ liệu trên bảng
Nhấn vào dấu ‘+’ để lấy thông tin schema, table
Chú ý: Giới hạn tối đa lựa chọn là 100 table
Bước 7: Nhấn Next để chuyển qua màn Review và kiểm tra thông tin.
Bước 8: Kiểm tra thông tin và nhấn nút Create để hoàn thành việc tạo connector.