Streaming Data từ PostgreSQL sang Elasticsearch với Debezium + Kafka
Ứng dụng của Change Data Capture trong thực tế
Mở đầu
Ở bài trước, mình đã giới thiệu kỹ thuật Change Data Capture (CDC). Tiếp nối, bài này sẽ minh họa một ứng dụng thực tế: đồng bộ dữ liệu từ PostgreSQL sang Elasticsearch (ES) để hỗ trợ full-text search và phân tích dữ liệu. Chúng ta sẽ sử dụng Debezium và Kafka, với Postgres làm nguồn và ES là nơi lưu trữ kết quả.
Mục đích
Trong nhiều hệ thống, PostgreSQL thường được dùng làm main database cho các giao dịch (transactional DB). Nó mạnh về ACID, đảm bảo tính toàn vẹn dữ liệu. Nhưng khi ứng dụng phát triển, sẽ có những nhu cầu mới:
Full-text search: tìm kiếm nhanh theo từ khóa, hỗ trợ stemming, highlight, ranking. PostgreSQL có
pg_trgm
haytsvector
, nhưng khi khối lượng dữ liệu lớn thì không thể so sánh với Elasticsearch.Analytics / Dashboard real-time: PostgreSQL không được tối ưu cho query dạng aggregate phức tạp trên dữ liệu lớn. Elasticsearch + Kibana sẽ đáp ứng tốt hơn.
Offload workload: tách tải tìm kiếm/analytics khỏi database chính, giúp Postgres tập trung cho transaction.
Vì vậy, một giải pháp phổ biến là sync dữ liệu từ Postgres sang Elasticsearch. Các hành động search, phân tích hay dashboard sẽ query data từ ES, không làm tăng workload cho main database.
Tổng quan kiến trúc
Kiến trúc của project khá đơn giản, mỗi khi có update gì từ source database, Debezium sẽ lắng nghe và gửi message tới Kafka topic, data sau đó được đồng bộ hoá vào target database (ES).
Các thành phần chi tiết
PostgreSQL: Nguồn dữ liệu gốc, source of truth.
Debezium: Công cụ Change Data Capture (CDC), lắng nghe Write Ahead Log (WAL) của Postgres để phát hiện insert/update/delete.
Kafka: Hệ thống streaming, lưu trữ event và đảm bảo tính bền vững.
Kafka Connect + Elasticsearch Sink: Tiêu thụ event từ Kafka, ghi vào Elasticsearch.
Elasticsearch: Đích đến, nơi dữ liệu sẵn sàng cho tìm kiếm và phân tích.
Docker Compose Setup
Chúng ta đã tìm hiểu về Docker và Docker compose ở bài trước, ở bài này chúng ta có thể nhanh chóng tạo những services cần thiết cho demo.
Giả sử chúng ta có file doceker-compose với những services sau: PostgreSQL, Zookeeper, Kafka, Debezium, Elasticsearch và Kibana ( Kibana dùng để xem data trong ES).
services:
postgres:
image: bitnami/postgresql:15
container_name: postgres
environment:
- POSTGRESQL_USERNAME=postgres
- POSTGRESQL_PASSWORD=postgres
- POSTGRESQL_DATABASE=mydb
ports:
- "5432:5432"
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
zookeeper:
image: bitnami/zookeeper:3.9
container_name: zookeeper
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
ports:
- "2181:2181"
kafka:
image: bitnami/kafka:3.7
container_name: kafka
ports:
- "9092:9092"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
connect:
image: confluentinc/cp-kafka-connect:7.6.1
container_name: connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "compose-connect-group"
CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_REST_ADVERTISED_HOST_NAME: connect
volumes:
- ./plugins:/usr/share/confluent-hub-components
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.13.4
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- ES_JAVA_OPTS=-Xms512m -Xmx512m
ports:
- "9200:9200"
kibana:
image: docker.elastic.co/kibana/kibana:8.13.4
container_name: kibana
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
- elasticsearch
Sau đó start toàn bộ tất cả services bằng lệnh
docker-compose up
Đợi docker pull images nếu chưa có sẵn, sau khi xong chúng ta có thể verify bằng cách mở một terminal mới và chạy `docker ps`
Tạo Connector từ Debezium tới PostgreSQL
Sau khi các services start thành công, chúng ta tạo 2 connector từ Debezium tới PostgreSQL và ES bằng cách gửi 2 requests tưng ứng.
Mở một terminal mới và send một POST request để đăng ký một connector tới Postgresql
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "mydb",
"topic.prefix": "dbserver1",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_pub",
"tombstones.on.delete": "false"
}
}'
Ý nghĩa
connector.class
: chỉ định loại connector (ở đây là Debezium PostgreSQL).database.hostname
,database.port
,database.user
,database.password
,database.dbname
: thông tin kết nối đến Postgres.database.server.name
: tên logical server, cũng là prefix cho các Kafka topic (ví dụ:dbserver1.public.users
).plugin.name
: chế độ xuất WAL log, Postgres thường dùngpgoutput
.slot.name
&publication.name
: cơ chế logical replication trong Postgres (Debezium sẽ tự tạo nếu chưa tồn tại).tombstones.on.delete
: cấu hình cách xử lý khi có bản ghi bị xóa.
Output khi chay curl:
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "mydb",
"topic.prefix": "dbserver1",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_pub",
"tombstones.on.delete": "false",
"name": "postgres-connector"
},
"tasks": [],
"type": "source"
}
Sau khi đăng ký thành công, Debezium sẽ lắng nghe các sự kiện từ PostgreSQL.
Khai báo Elasticsearch Sink Connector
Sau khi Debezium lắng nghe các sự kiện từ PostgreSQL, bước tiếp theo là đăng ký một sink connector đổ data từ Debezium vào ES. Chúng ta làm cách đó bằng cách send một POST request như sau:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "es-sink-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "dbserver1.public.users",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": "true",
"schema.ignore": "true"
}
}'
Ý nghĩa:
connector.class
: loại connector, ở đây là Elasticsearch sink (cần Confluent Connector).topics
: Kafka topic mà connector sẽ lắng nghe (ở đây là bảngusers
trong schemapublic
của Postgres).connection.url
: URL của Elasticsearch cluster.type.name
: loại document trong Elasticsearch (mặc định_doc
).key.ignore
: nếutrue
, connector sẽ không dùng key từ Kafka message, mà để Elasticsearch tự sinh_id
.schema.ignore
: bỏ qua schema Avro/JSON, phù hợp khi không cần validate schema.
Output:
{
"name": "es-sink-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "dbserver1.public.users",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": "true",
"schema.ignore": "true",
"name": "es-sink-connector"
},
"tasks": [],
"type": "sink"
}
Demo
Vào Postgres (
psql
) và insert một bản ghi mới vào bảngusers
.docker exec -it postgres psql -U postgres -d mydb
Sau đó chạy lệnh sau để insert 1 record
INSERT INTO customers (id, name, email) VALUES (10, 'Nguyen Van A', 'a@example.com');
Rồi view data:
SELECT * FROM customers;
Debezium sẽ capture thay đổi → Kafka.
Kafka Connect sẽ gửi sang Elasticsearch.
Mở Kibana tại http://localhost:5601 và search thử dữ liệu trong index.
Kết luận
Với stack Debezium + Kafka + Elasticsearch, bạn có thể:
Đồng bộ dữ liệu real-time từ Postgres sang ES.
Xử lý thay đổi mà không cần viết cron job hoặc ETL phức tạp.
Kết hợp Kibana để vừa tìm kiếm, vừa phân tích dữ liệu.
Đây là một kiến trúc đơn giản nhưng mạnh mẽ, phù hợp cho các hệ thống cần full-text search và data analytics mà vẫn giữ được dữ liệu gốc trong Postgres.
Đọc thêm
https://docs.docker.com/get-started/introduction/get-docker-desktop/
https://aws.amazon.com/vi/what-is/elasticsearch/
https://estuary.dev/blog/postgres-to-elasticsearch-streaming/
Cảm ơn bạn đã đọc bài viết này, nếu bạn có bất kỳ câu hỏi nào, đừng ngần ngại để lại comment ở bên dưới. Và đừng quên subscribe để luôn nhận được các bài viết mới nhất
Cảm ơn anh vì bài viết. Mình có thể theo dõi sự thay đổi bảng hoặc theo một vài cột không anh nhỉ? Hi vọng anh có thêm một bài viết về triển khai và vận hành trong thực tế, những lỗi có thể gặp cũng như cách khắc phục.