Flink新增特性 | CDC(Change Data Capture) 原理和实践应用
程序源代码
共 4698字,需浏览 10分钟
·
2020-11-24 12:27
点击上方蓝色字体,选择“设为星标”
使用flink sql进行数据同步,可以将数据从一个数据同步到其他的地方,比如mysql、elasticsearch等。
可以在源数据库上实时的物化一个聚合视图
因为只是增量同步,所以可以实时的低延迟的同步数据
使用EventTime join 一个temporal表以便可以获取准确的结果
数据库之间的增量数据同步
审计日志
数据库之上的实时物化视图
基于CDC的维表join
…
Flink CDC使用方式
目前Flink支持两种内置的connector,PostgreSQL和mysql,接下来我们以mysql为例。
使用这种架构是好处有:
减少canal和kafka的维护成本,链路更短,延迟更低
flink提供了exactly once语义
可以从指定position读取
去掉了kafka,减少了消息的存储成本
<dependency>
<groupId>com.alibaba.ververicagroupId>
<artifactId>flink-connector-mysql-cdcartifactId>
<version>1.1.0version>
dependency>
如果是sql客户端使用,需要下载 flink-sql-connector-mysql-cdc-1.1.0.jar 并且放到
连接mysql数据库的示例sql如下:
-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'inventory',
'table-name' = 'products'
);
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
public class MySqlBinlogSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("localhost")
.port(3306)
.databaseList("inventory") // monitor all tables under inventory database
.username("flinkuser")
.password("flinkpw")
.deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute();
}
}
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_2.11artifactId>
<version>1.11.0version>
dependency>
CREATE TABLE topic_products (
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' -- using canal-json as the format
)
<dependency>
<groupId>com.alibaba.ververicagroupId>
<artifactId>flink-format-changelog-jsonartifactId>
<version>1.0.0version>
dependency>
-- assuming we have a user_behavior logs
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka', -- using kafka connector
'topic' = 'user_behavior', -- kafka topic
'scan.startup.mode' = 'earliest-offset', -- reading from the beginning
'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker address
'format' = 'json' -- the data format is json
);
-- we want to store the the UV aggregation result in kafka using changelog-json format
create table day_uv (
day_str STRING,
uv BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'day_uv',
'scan.startup.mode' = 'earliest-offset', -- reading from the beginning
'properties.bootstrap.servers' = 'localhost:9092', -- kafka broker address
'format' = 'changelog-json' -- the data format is json
);
-- write the UV results into kafka using changelog-json format
INSERT INTO day_uv
SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, count(distinct user_id) as uv
FROM user_behavior
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd');
-- reading the changelog back again
SELECT * FROM day_uv;
版权声明:
文章不错?点个【在看】吧! ?
评论