从 0 到 1 为 Databend 设计实现轻量级 CDC
什么是 CDC
CDC(Change Data Capture)是一种数据同步技术,用于实时捕获和传递数据库中的数据更改。通过 CDC,我们可以将数据库中的变更事件捕获并转换成数据流,然后将其传递给其他系统或应用程序,以实现数据的实时同步和分发。常见的 CDC 格式为:
{
"op": "Update", // "Insert", "Delete",
"event_time": "2023-07-22 12:00:00",
"payload": {
"id": 123,
"author": "Franz Kafka",
"title": "ToDatabend",
"published_at": "2023-07-01"
}
}
一种生产可用的 CDC 系统架构可以是下图:
现阶段主流 CDC 方案和架构
在目前的技术发展中,有几种主流的CDC方案和架构:
1. Flink CDC
Flink CDC[1] 是基于 Apache Flink 的 CDC 方案。它可以实时捕获数据库的变更事件,并将其转换成流数据。Flink CDC 提供了非常多的连接器组件,可以在异构的数据库之间实现数据流动。
Databend 也提供了 flink-databend-connector,可以与 MySQL,PG 等 RDBMS 构建实时数据同步。
2. Kafka Connector
Kafka Connector[2] 是基于 Apache Kafka 的 CDC 方案。Kafka 作为分布式消息队列,可以用于数据传递和分发。Kafka Connector 允许将数据从数据库中捕获,并将其发布到 Kafka 主题中,供其他系统消费。
3. Canal
Canal[3] 是阿里巴巴开源的 CDC 解决方案。它可以捕获 MySQL 数据库的变更,并将其转换成消息格式输出,常用于数据同步和业务解耦。限制是只能基于 MySQL 数据库增量日志解析。
Debezium Server
Debezium Server[4] 是一个基于 Debezium Engine 的 CDC 项目。Debezium Engine 是 Debezium 项目的核心,用于捕获数据库的变更事件。Debezium Server 构建在该引擎之上,提供了一种轻量级的 CDC 解决方案,用于实时捕获数据库更改,并将其转换为事件流,最终将数据写入目标数据库。
Debezium Server Databend
Debezium Server Databend[5] 是一个基于 Debezium Engine 自研的轻量级 CDC 项目,用于实时捕获数据库更改并将其作为事件流传递最终将数据写入目标数据库 Databend。它提供了一种简单的方式来监视和捕获关系型数据库的变化,并支持将这些变化转换为可消费事件。使用 Debezium server databend 实现 CDC 无须依赖大型的 Data Infra 比如 Flink, Kafka, Spark 等,只需一个启动脚本即可开启实时数据同步。
源码分析 debezium-server-databend 实现
Debezium Server Databend 实现了轻量级CDC方案,通过 Debezium Engine 捕获数据库变更事件,将其转换成事件流,然后将事件流传递给 Databend 数据库,实现数据的实时同步。下面先从代码层面分析一下该组件的实现原理。
主要代码的结构是:
.
├── DatabendChangeConsumer.java
├── DatabendChangeEvent.java
├── DatabendTypes.java
├── DatabendUtil.java
├── DebeziumMetrics.java
├── batchsizewait
│ ├── InterfaceBatchSizeWait.java
│ ├── MaxBatchSizeWait.java
│ └── NoBatchSizeWait.java
└── tablewriter
├── AppendTableWriter.java
├── BaseTableWriter.java
├── RelationalTable.java
├── TableNotFoundException.java
├── TableWriterFactory.java
└── UpsertTableWriter.java
Debezium server 的入口逻辑在 DatabendChangeConsumer
中,继承 BaseChangeConsumer
并实现相应方法, 作用是加载配置,初始化 server, database 以及处理 batch events :
/**
* Implementation of the consumer that delivers the messages to databend database tables.
*
* @author hantmac
*/
@Named("databend")
@Dependent
public class DatabendChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
...// @ConfigProperty(name = "debezium.sink.databend.xxx", defaultValue = "")
void connect() throws Exception {
...
}
public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {...}
}
核心代码是在 handleBatch 中,在这里接收变更事件并发送到 tablewriter
中进一步处理。
@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
Instant start = Instant.now();
//group events by destination
Map<String, List<DatabendChangeEvent>> result =
records.stream()
.map((ChangeEvent<Object, Object> e)
-> {
try {
return new DatabendChangeEvent(e.destination(),
valDeserializer.deserialize(e.destination(), getBytes(e.value())),
e.key() == null ? null : valDeserializer.deserialize(e.destination(), getBytes(e.key())),
mapper.readTree(getBytes(e.value())).get("schema"),
e.key() == null ? null : mapper.readTree(getBytes(e.key())).get("schema")
);
} catch (IOException ex) {
throw new DebeziumException(ex);
}
})
.collect(Collectors.groupingBy(DatabendChangeEvent::destination));
// consume list of events for each destination table
for (Map.Entry<String, List<DatabendChangeEvent>> tableEvents : result.entrySet()) {
RelationalTable tbl = this.getDatabendTable(mapDestination(tableEvents.getKey()), tableEvents.getValue().get(0).schema()); // 获取 tablewriter 实例
tableWriter.addToTable(tbl, tableEvents.getValue()); // 将事件推送至 tablewriter
}
for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Processed event '{}'", record);
committer.markProcessed(record);
}
committer.markBatchFinished();
this.logConsumerProgress(records.size());
batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis());
TableWriterFactory
中提供了 Append
和 Upsert
两种模式:
public BaseTableWriter get(final Connection connection) {
if (upsert) {
return new UpsertTableWriter(connection, identifierQuoteCharacter.orElse(""), upsertKeepDeletes);
} else {
return new AppendTableWriter(connection, identifierQuoteCharacter.orElse(""));
}
}
每种模式都实现了 addTable
方法。addTable 的主要逻辑是解析 Debezium Event 中的字段 Name,Type,Value 然后调用 databend JDBC 将数据写入到目标表。Upsert mode相对复杂,这里以 upsert 为例来看下源码:
public void addToTable(final RelationalTable table, final List<DatabendChangeEvent> events) {
final String upsertSql = table.preparedUpsertStatement(this.identifierQuoteCharacter);
int inserts = 0;
List<DatabendChangeEvent> deleteEvents = new ArrayList<>();
try (PreparedStatement statement = connection.prepareStatement(upsertSql)) {
connection.setAutoCommit(false);
for (DatabendChangeEvent event : events) {
System.out.println(event.operation());
// NOTE: if upsertKeepDeletes = true, delete event data will insert into target table
if (upsertKeepDeletes || !event.operation().equals("d")) {
Map<String, Object> values = event.valueAsMap();
addParametersToStatement(statement, values, event.keyAsMap());
statement.addBatch();
} else if (event.operation().equals("d")) {
// here use soft delete
// if true delete, we can use this condition event.keyAsMap().containsKey(deleteColumn)
deleteEvents.add(event);
}
}
int[] batchResult = statement.executeBatch();
inserts = Arrays.stream(batchResult).sum();
System.out.println(String.format("insert rows %d", inserts));
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
// handle delete event
try {
deleteFromTable(table, deleteEvents);
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
Append mode
在 CDC 中,Append Mode 是一种数据写入模式。当数据库的一条记录发生变化时,CDC 会将该变化作为一条新的事件追加到事件流中。
Upsert mode
Upsert Mode 是另一种数据写入模式。当数据库的一条记录发生变化时,CDC 会将该变化作为一个更新操作,如果记录不存在,则作为插入操作,以实现数据的更新和插入。
Upsert mode 用到了 Databend 的 Replace into[6] 语法,所以需要用户指定一个 conflict key,这里我们提供一个配置:
debezium.sink.databend.database.primaryKey=id
如果没有提供该配置,就会退化成追加模式。
Delete
Delete操作是指数据库中的记录被删除,CDC 会将该操作作为一个事件写入事件流,以通知其他系统该记录已被删除。
Debezim Server 对 Delete 的处理比较复杂,在 DELETE 操作下会生成两条事件记录:
-
一个包含 "op": "d",其他的行数据以及字段; -
一个tombstones记录,它具有与被删除行相同的键,但值为null。
这两条事件会同时发出,在 Debezium Server Databend 中我们选择对 Delete 数据实行软删除,这就要求我们在 target table 中拥有 __deleted
字段,当 Delete 事件过来的时候我们将该字段置为 TRUE 后插入到目标表。
这样设计的好处是,有些用户最开始想要保留这些数据,但可能未来会想到将其删除,这样就为用户提供了可选的方案,未来想要删除这些数据的时候,只需要 delete from table where __deleted=true
即可。
关于 Debezium 对删除事件的说明以及处理方式,详情可参考文档[6]。
使用轻量级 CDC debezium-server-databend 构建 MySQL 到 Databend 的 实时数据同步
下面用一个实际案例展示如何基于 Debezium server databend 快速构建 MySQL 到 Databend 的实时数据同步。
准备阶段
准备一台已经安装了 Docker ,docker-compose 以及 Java 11 环境 的 Linux 或者 MacOS 。
准备教程所需要的组件
接下来的教程将以 docker-compose
的方式准备所需要的组件。
debezium-MySQL
docker-compose.yaml
version: '2.1'
services:
postgres:
image: debezium/example-postgres:1.1
ports:
- "5432:5432"
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mysql:
image: debezium/example-mysql:1.1
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=123456
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
Debezium Server Databend
-
Clone 项目: git clone ``https://github.com/databendcloud/debezium-server-databend.git
-
从项目根目录开始: -
构建和打包 debezium server: mvn -Passembly -Dmaven.test.skip package
-
构建完成后,解压服务器分发包: unzip debezium-server-databend-dist/target/debezium-server-databend-dist*.zip -d databendDist
-
进入解压后的文件夹: cd databendDist
-
创建 application.properties
文件并修改:nano conf/application.properties
,将下面的 application.properties 拷贝进去,根据用户实际情况修改相应的配置。 -
使用提供的脚本运行服务: bash run.sh
-
Debezium Server with Databend 将会启动
同时我们也提供了相应的 Docker image,可以在容器中一键启动:
version: '2.1'
services:
debezium:
image: ghcr.io/databendcloud/debezium-server-databend:pr-2
ports:
- "8080:8080"
- "8083:8083"
volumes:
- $PWD/conf:/app/conf
- $PWD/data:/app/data
NOTE: 在容器中启动注意所连接数据库的网络。
Debezium Server Databend Application Properties
本文章使用下面提供的配置,更多的参数说明以及配置可以参考文档[7]。
debezium.sink.type=databend
debezium.sink.databend.upsert=true
debezium.sink.databend.upsert-keep-deletes=false
debezium.sink.databend.database.databaseName=debezium
debezium.sink.databend.database.url=jdbc:databend://tnf34b0rm--xxxxxx.default.databend.cn:443
debezium.sink.databend.database.username=cloudapp
debezium.sink.databend.database.password=password
debezium.sink.databend.database.primaryKey=id
debezium.sink.databend.database.tableName=products
debezium.sink.databend.database.param.ssl=true
# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json
# mysql source
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000
debezium.source.database.hostname=127.0.0.1
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=123456
debezium.source.database.dbname=mydb
debezium.source.database.server.name=from_mysql
debezium.source.include.schema.changes=false
debezium.source.table.include.list=mydb.products
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true
# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN
准备数据
在 MySQL 数据库中准备数据
进入 MySQL 容器
docker-compose exec mysql mysql -uroot -p123456
创建数据库 mydb 和表 products
,并插入数据:
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE products AUTO_INCREMENT = 10;
INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"cloud","test for databend"),
(default,"spare tire","24 inch spare tire");
在 Databend 中创建 Database
NOTE: 用户可以不必先在 Databend 中创建表,系统检测到后会自动为用户建表。
启动 Debezium Server Databend
bash run.sh
首次启动会进入 init snapshot 模式,通过配置的 Batch Size 全量将 MySQL 中的数据同步到 Databend,所以在 Databend 中可以看到 MySQL 中的数据已经同步过来了:
同步 Insert 数据
我们继续往 MySQL 中插入 5 条数据:
INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer");
Debezium server databend 日志:
同时在 Databend 中可以查到 5 条数据已经同步过来了:
同步 Update 数据
配置文件中 debezium.sink.databend.upsert=true
,所以我们也可以处理 Update/Delete 的事件。
在 MySQL 中更新 id=10 的数据:
update products set name="from debezium" where id=10;
在 Databend 中可以查到 id 为 10 的数据已经被更新:
同步 Delete 数据
在配置文件中,有以下的配置,既可开启处理 Delete 事件的能力:
debezium.sink.databend.upsert-keep-deletes=false
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true
在 MySQL 中删除 id=12 的数据:
delete from products where id=12;
在 Databend 中可以观察到 id=12 的值的 __deleted
字段已经被置为 true
。
环境清理
操作结束后,在 docker-compose.yml
文件所在的目录下执行如下命令停止所有容器:
docker-compose down
结论
文章介绍了 databend 的轻量级 CDC 实现原理,演示了基于轻量级 CDC debezium server databend 构建 MySQL 到 Databend 的 实时数据同步的全部过程,这种方式不需要依赖 Flink, Kafka 等大型组件,启动和管理非常方便。
参考资料
Flink CDC: https://ververica.github.io/flink-cdc-connectors/release-2.1/content/about.html
[2]Kafka Connector: https://docs.confluent.io/platform/current/connect/index.html
[3]Canal: https://github.com/alibaba/canal
[4]Debezium Server: https://debezium.io/documentation/reference/1.6/overview.html
[5]Debezium Server Databend: https://github.com/databendcloud/debezium-server-databend
[6]replace into: https://databend.rs/doc/sql-commands/dml/dml-replace
[7]Debezium Event Flattening: https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
[8]debezium-server-databend 配置文档: https://github.com/databendcloud/debezium-server-databend/blob/main/docs/docs.md