【Mysql源码分析】基于行的复制实现之“主从关系建立”
作者:c rain
来源:SegmentFault 思否社区
前言
经常听到别人说Mysql的SBR、RBR、MBR,如果不清楚,那么可以跟着文章一起来学习。由于涉及到主从的内容比较多,需要拆分成多篇内容来概述,这章先从基础知识和主从关系建立开始讲起。还会出一篇文章详细讲解从主同步。
1.了解什么是SBR、RBR、MBR?
2.了解下主从配置该如何配置?
3.了解主从关系如何建立?
1.配置Mysql主从
在本文中,分为一主一从。主监听的端口为3306,从监听的端口为3309。
1.1主服务配置
master配置,配置文件my.cnf:
[mysqld]
port=3306
basedir=/usr/local/mysql8.0.20
datadir=/usr/local/mysql8.0.20/data
socket=/tmp/mysql.sock
#explicit_defaults_for_timestamp=true
lower_case_table_names=2 #表名存储为给定的大小写但是比较的时候是小写的
log_bin=mysql-bin
server_id =10
主服务启动Mysql命令:
# sudo bin/mysqld --defaults-file=/usr/local/mysql8.0.20/etc/my.cnf --user=root
客户端连接master端
# mysql --socket=/tmp/mysql.sock -u root
启动master后,需要创建一个用户,用于主从同步:
mysql> GRANT REPLICATION SLAVE ON *.* TO repl@'localhost' IDENTIFIED BY '123456';
查看主服务状态
mysql> show master status \G;
1.2 从服务配置
slave配置,配置文件salve.conf:
[mysqld]
port=3309
basedir=/usr/local/mysql8.0.20
datadir=/usr/local/mysql8.0.20/data1
socket=/tmp/mysqlslave.sock
#explicit_defaults_for_timestamp=true
lower_case_table_names=2 #表名存储为给定的大小写但是比较的时候是小写的
log_bin=mysql-bin
server_id=2
relay_log=/usr/local/mysql8.0.20/mysql-relay-bin
read_only=1 #执行模式
从服务启动Mysql命令:
# sudo bin/mysqld --defaults-file=/usr/local/mysql8.0.20/etc/salve.conf --user=root
连接Slave端:
# mysql --socket=/tmp/mysqlslave.sock -u root
Slave端主从同步配置:
mysql>
change master to master_host='localhost',
master_user='repl',
master_password='123456',
master_log_file='mysql-bin.000001',
master_log_pos=0;
Slave/Master
#查看binlog事件
mysql> SHOW BINLOG EVENTS;
如果发现主从没有同步,可以使用如下命令查看相应的状态:
#查看slave状态
mysql> show slave status;
如果遇到从库报这个错误:Got fatal error 1236 from master when reading data from binary log: 'Could not find first log file name in binary log index file'
Got fatal error 1236 from master when reading data from binary log: 'could not find next log'
此时可以操作如下三个命令进行处理:
#关闭slave
mysql> stop slave;
#重置slave
mysql> reset slave;
#启动slave
mysql> start slave;
2.MYSQL中BINLOG_FORMAT的三种模式
mysql三种复制方式:基于SQL语句的复制(statement-based replication, SBR),基于行的复制(row-based replication, RBR),混合模式复制(mixed-based replication, MBR)。对应的,binlog的格式也有三种:STATEMENT,ROW,MIXED。
2.1 STATEMENT模式(SBR)
每一条会修改数据的sql语句会记录到binlog中。优点是并不需要记录每一条sql语句和每一行的数据变化,减少了binlog日志量,节约IO,提高性能。缺点是在某些情况下会导致master-slave中的数据不一致(如sleep()函数, last_insert_id(),以及user-defined functions(udf)等会出现问题)
2.2 ROW模式(RBR)
不记录每条sql语句的上下文信息,仅需记录哪条数据被修改了,修改成什么样了。而且不会出现某些特定情况下的存储过程、或function、或trigger的调用和触发无法被正确复制的问题。缺点是会产生大量的日志,尤其是alter table的时候会让日志暴涨。
2.3 MIXED模式(MBR)
以上两种模式的混合使用,一般的复制使用STATEMENT模式保存binlog,对于STATEMENT模式无法复制的操作使用ROW模式保存binlog,MySQL会根据执行的SQL语句选择日志保存方式。
binlog复制配置在mysql的配置文件my.cnf中,可以通过一下选项配置binglog相关,配置如下:
#binlog日志格式,mysql默认采用statement,建议使用mixed
binlog_format = MIXED
#binlog日志文件
log-bin = /data/mysql/mysql-bin.log
#binlog过期清理时间
expire_logs_days = 7
#binlog每个日志文件大小
max_binlog_size = 100m
#binlog缓存大小
binlog_cache_size = 4m
#最大binlog缓存大小
max_binlog_cache_size = 512m
2.4 优缺点对比
对于执行的SQL语句中包含now()这样的时间函数,会在日志中产生对应的unix_timestamp()*1000的时间字符串,slave在完成同步时,取用的是sqlEvent发生的时间来保证数据的准确性。另外对于一些功能性函数slave能完成相应的数据同步,而对于上面指定的一些类似于UDF函数,导致Slave无法知晓的情况,则会采用ROW格式存储这些Binlog,以保证产生的Binlog可以供Slave完成数据同步。
现在来比较以下 SBR 和 RBR 这2种模式各自的优缺点:
SBR 的优点:
技术比较成熟 binlog文件较小 binlog中包含了所有数据库更改信息,可以据此来审核数据库的安全等情况 binlog可以用于实时的还原,而不仅仅用于复制 主从版本可以不一样,从服务器版本可以比主服务器版本高
SBR 的缺点:
不是所有的UPDATE语句都能被复制,尤其是包含不确定操作的时候。 调用具有不确定因素的 UDF 时复制也可能出问题 使用以下函数的语句也无法被复制: LOAD_FILE() UUID() USER() FOUND_ROWS() SYSDATE() (除非启动时启用了 --sysdate-is-now 选项)
INSERT ... SELECT 会产生比 RBR 更多的行级锁
复制需要进行全表扫描(WHERE 语句中没有使用到索引)的 UPDATE 时,需要比 RBR 请求更多的行级锁
对于有 AUTO_INCREMENT 字段的 InnoDB表而言,INSERT 语句会阻塞其他 INSERT 语句
对于一些复杂的语句,在从服务器上的耗资源情况会更严重,而 RBR 模式下,只会对那个发生变化的记录产生影响
存储函数(不是存储过程)在被调用的同时也会执行一次 NOW() 函数,这个可以说是坏事也可能是好事
确定了的 UDF 也需要在从服务器上执行
数据表必须几乎和主服务器保持一致才行,否则可能会导致复制出错
执行复杂语句如果出错的话,会消耗更多资源
RBR 的优点:
INSERT ... SELECT 包含 AUTO_INCREMENT 字段的 INSERT 没有附带条件或者并没有修改很多记录的 UPDATE 或 DELETE 语句执行 INSERT,UPDATE,DELETE 语句时锁更少,从服务器上采用多线程来执行复制成为可能。
RBR 的缺点:
binlog 大了很多 复杂的回滚时 binlog 中会包含大量的数据
UDF 产生的大 BLOB 值会导致复制变慢
当在非事务表上执行一段堆积的SQL语句时,最好采用 SBR 模式,否则很容易导致主从服务器的数据不一致情况发生。
如果是采用 INSERT,UPDATE,DELETE 直接操作表的情况,则日志格式根据 binlog_format 的设定而记录
如果是采用 GRANT,REVOKE,SET PASSWORD 等管理语句来做的话,那么无论如何都采用 SBR 模式记录
注:采用 RBR 模式后,能解决很多原先出现的主键重复问题。
2.5 如何查看复制格式
mysql> show variables like 'binlog_format';
static Sys_var_enum Sys_binlog_format(
"binlog_format",
"What form of binary logging the master will "
"use: either ROW for row-based binary logging, STATEMENT "
"for statement-based binary logging, or MIXED. MIXED is statement-"
"based binary logging except for those statements where only row-"
"based is correct: those which involve user-defined functions (i.e. "
"UDFs) or the UUID() function; for those, row-based binary logging is "
"automatically used. If NDBCLUSTER is enabled and binlog-format is "
"MIXED, the format switches to row-based and back implicitly per each "
"query accessing an NDBCLUSTER table",
SESSION_VAR(binlog_format), CMD_LINE(REQUIRED_ARG, OPT_BINLOG_FORMAT),
binlog_format_names,
DEFAULT(BINLOG_FORMAT_ROW), //默认binlog的同步为行复制
NO_MUTEX_GUARD,
NOT_IN_BINLOG, ON_CHECK(binlog_format_check),
ON_UPDATE(fix_binlog_format_after_update));
// Values for binlog_format sysvar
enum enum_binlog_format {
BINLOG_FORMAT_MIXED = 0, ///<混合模式 statement if safe, otherwise row - autodetected
BINLOG_FORMAT_STMT = 1, ///BINLOG_FORMAT_ROW = 2, ///<行复制 row-based
BINLOG_FORMAT_UNSPEC =
3 ///< thd_binlog_format() returns it when binlog is closed
};
3.建立主从关系
3.1 slave端start_salve方法
bool start_slave(THD *thd, LEX_SLAVE_CONNECTION *connection_param,
LEX_MASTER_INFO *master_param, int thread_mask_input,
Master_info *mi, bool set_mts_settings) {
bool is_error = false;
int thread_mask;
DBUG_TRACE;
lock_slave_threads(mi); //停止运行线程
// 获取已停止线程的掩码
init_thread_mask(&thread_mask, mi, true /* inverse */);
/*
我们将停止下面的所有线程。但如果用户想只启动一个线程,
就好像另一个线程正在运行一样(就像我们不要想碰另一根线),
所以将位设置为0其他线程
*/
if (thread_mask_input) {
thread_mask &= thread_mask_input;
}
if (thread_mask) // 一些线程停止,启动它们
{
if (load_mi_and_rli_from_repositories(mi, false, thread_mask)) {
is_error = true;
my_error(ER_MASTER_INFO, MYF(0));
} else if (*mi->host || !(thread_mask & SLAVE_IO)) {
/*
如果我们要启动IO线程,我们需要考虑通过启动从机提供的选项。
*/
if (thread_mask & SLAVE_IO) {
if (connection_param->user) { //设置用户
mi->set_start_user_configured(true);
mi->set_user(connection_param->user);
}
if (connection_param->password) { //设置密码
mi->set_start_user_configured(true);
mi->set_password(connection_param->password);
}
if (connection_param->plugin_auth) //设置授权插件
mi->set_plugin_auth(connection_param->plugin_auth);
if (connection_param->plugin_dir) //插件目录
mi->set_plugin_dir(connection_param->plugin_dir);
}
//...
//初始化设置
int slave_errno = mi->rli->init_until_option(thd, master_param);
if (slave_errno) {
my_error(slave_errno, MYF(0));
is_error = true;
}
if (!is_error) is_error = check_slave_sql_config_conflict(mi->rli);
} else if (master_param->pos || master_param->relay_log_pos ||
master_param->gtid)
push_warning(thd, Sql_condition::SL_NOTE, ER_UNTIL_COND_IGNORED,
ER_THD(thd, ER_UNTIL_COND_IGNORED));
if (!is_error)
//启动slave线程
is_error =
start_slave_threads(false /*need_lock_slave=false*/,
true /*wait_for_start=true*/, mi, thread_mask);
} else {
is_error = true;
my_error(ER_BAD_SLAVE, MYF(0));
}
} else {
/* 如果所有线程都已启动,则没有错误,只有一个警告 */
push_warning_printf(
thd, Sql_condition::SL_NOTE, ER_SLAVE_CHANNEL_WAS_RUNNING,
ER_THD(thd, ER_SLAVE_CHANNEL_WAS_RUNNING), mi->get_channel());
}
/*
如果有人试图启动,请清除启动信息,IO线程以避免任何安全问题。
*/
if (is_error && (thread_mask & SLAVE_IO) == SLAVE_IO) mi->reset_start_info();
unlock_slave_threads(mi);
mi->channel_unlock();
return is_error;
}
bool start_slave_threads(bool need_lock_slave, bool wait_for_start,
Master_info *mi, int thread_mask) {
mysql_mutex_t *lock_io = nullptr, *lock_sql = nullptr,
*lock_cond_io = nullptr, *lock_cond_sql = nullptr;
mysql_cond_t *cond_io = nullptr, *cond_sql = nullptr;
bool is_error = false;
DBUG_TRACE;
DBUG_EXECUTE_IF("uninitialized_master-info_structure", mi->inited = false;);
//...
if (thread_mask & SLAVE_IO) //判断是否支持SLAVE_IO
is_error = start_slave_thread(
#ifdef HAVE_PSI_THREAD_INTERFACE
key_thread_slave_io,
#endif
handle_slave_io, lock_io, lock_cond_io, cond_io, &mi->slave_running,
&mi->slave_run_id, mi); //调用handle_slave_io方法
if (!is_error && (thread_mask & SLAVE_SQL)) { //判断是否支持SLAVE_SQL
//...
if (!is_error)
is_error = start_slave_thread(
#ifdef HAVE_PSI_THREAD_INTERFACE
key_thread_slave_sql,
#endif
handle_slave_sql, lock_sql, lock_cond_sql, cond_sql,
&mi->rli->slave_running, &mi->rli->slave_run_id, mi); //调用handle_slave_sql方法
if (is_error)
terminate_slave_threads(mi, thread_mask & SLAVE_IO,
rpl_stop_slave_timeout, need_lock_slave);
}
return is_error;
}
3.2 slave端handle_slave_io方法
extern "C" void *handle_slave_io(void *arg) {
//...
my_thread_init();
{
//初始化slave线程
if (init_slave_thread(thd, SLAVE_THD_IO)) {
mysql_cond_broadcast(&mi->start_cond);
mysql_mutex_unlock(&mi->run_lock);
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
ER_THD(thd, ER_SLAVE_FATAL_ERROR),
"Failed during slave I/O thread initialization ");
goto err;
}
//...
mysql_cond_broadcast(&mi->start_cond); //调用做唤醒操作
//...
//发起登陆操作
successfully_connected = !safe_connect(thd, mysql, mi);
// we can get killed during safe_connect
#ifdef HAVE_SETNS
if (mi->is_set_network_namespace()) {
// Restore original network namespace used to be before connection has
// been created
successfully_connected =
restore_original_network_namespace() | successfully_connected;
}
#endif
//...
/*
注册slave到master
*/
THD_STAGE_INFO(thd, stage_registering_slave_on_master);
if (register_slave_on_master(mysql, mi, &suppress_warnings)) {
if (!check_io_slave_killed(thd, mi,
"Slave I/O thread killed "
"while registering slave on master")) {
LogErr(ERROR_LEVEL, ER_RPL_SLAVE_IO_THREAD_CANT_REGISTER_ON_MASTER);
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
reconnect_messages[SLAVE_RECON_ACT_REG]))
goto err;
} else
goto err;
goto connected;
}
//...
while (!io_slave_killed(thd, mi)) {
MYSQL_RPL rpl;
THD_STAGE_INFO(thd, stage_requesting_binlog_dump);
if (request_dump(thd, mysql, &rpl, mi, &suppress_warnings)) { //发起dump指令
LogErr(ERROR_LEVEL, ER_RPL_SLAVE_ERROR_REQUESTING_BINLOG_DUMP,
mi->get_for_channel_str());
if (check_io_slave_killed(thd, mi,
"Slave I/O thread killed while \
requesting master dump") ||
try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
goto err;
goto connected;
}
//...
}
//...
my_thread_end(); //线程结束
#if OPENSSL_VERSION_NUMBER < 0x10100000L
ERR_remove_thread_state(0);
#endif /* OPENSSL_VERSION_NUMBER < 0x10100000L */
my_thread_exit(nullptr); //退出线程
return (nullptr); // Avoid compiler warnings
}
3.4 master建立实现
bool dispatch_command(THD *thd, const COM_DATA *com_data,
enum enum_server_command command) {
//...
switch (command) {
case COM_REGISTER_SLAVE: { //注册slave到master
// TODO: access of protocol_classic should be removed
if (!register_slave(thd, thd->get_protocol_classic()->get_raw_packet(),
thd->get_protocol_classic()->get_packet_length()))
my_ok(thd);
break;
}
//...
case COM_BINLOG_DUMP_GTID: //binlog_dump_gtid
// TODO: access of protocol_classic should be removed
error = com_binlog_dump_gtid(
thd, (char *)thd->get_protocol_classic()->get_raw_packet(),
thd->get_protocol_classic()->get_packet_length());
break;
case COM_BINLOG_DUMP: //binlog_dump
// TODO: access of protocol_classic should be removed
error = com_binlog_dump(
thd, (char *)thd->get_protocol_classic()->get_raw_packet(),
thd->get_protocol_classic()->get_packet_length());
break;
}
return error;
}
bool com_binlog_dump(THD *thd, char *packet, size_t packet_length) {
DBUG_TRACE;
ulong pos;
ushort flags = 0;
const uchar *packet_position = (uchar *)packet;
size_t packet_bytes_todo = packet_length;
DBUG_ASSERT(!thd->status_var_aggregated);
thd->status_var.com_other++;
thd->enable_slow_log = opt_log_slow_admin_statements;
if (check_global_access(thd, REPL_SLAVE_ACL)) return false;
/*
4 bytes is too little, but changing the protocol would break
compatibility. This has been fixed in the new protocol. @see
com_binlog_dump_gtid().
*/
READ_INT(pos, 4);
READ_INT(flags, 2);
READ_INT(thd->server_id, 4);
DBUG_PRINT("info",
("pos=%lu flags=%d server_id=%d", pos, flags, thd->server_id));
kill_zombie_dump_threads(thd);
query_logger.general_log_print(thd, thd->get_command(), "Log: '%s' Pos: %ld",
packet + 10, (long)pos);
mysql_binlog_send(thd, thd->mem_strdup(packet + 10), (my_off_t)pos, nullptr,
flags); //发送binlog
unregister_slave(thd, true, true /*need_lock_slave_list=true*/);
/* 如果我们到了这里,线程需要终止 */
return true;
error_malformed_packet:
my_error(ER_MALFORMED_PACKET, MYF(0));
return true;
}
4.主从建立过程中数据包
#查询当前时间戳
SELECT UNIX_TIMESTAMP()
#查询master的serverid
SELECT @@GLOBAL.SERVER_ID
#设置心跳周期,单位为纳秒,其实只有30s。初始化心跳如图4-2
SET @master_heartbeat_period= 30000001024
#设置master_binlog_checksum
SET @master_binlog_checksum= @@global.binlog_checksum
#查询master_binlog_checksum
SELECT @master_binlog_checksum
#获得是否支持gtid
SELECT @@GLOBAL.GTID_MODE
#查询server uuid
SELECT @@GLOBAL.SERVER_UUID
#设置slave uuid
SET @slave_uuid= '2dc27df4-e143-11ea-b396-cc679ee1902b'
总结
mysql复制模式分为三种:STATEMENT模式(SBR)、ROW模式(RBR)、MIXED模式(MBR)。 mysql8.0.20默认ROW模式(RBR)。 发送binlog支持两种形式:COM_BINLOG_DUMP与COM_BINLOG_DUMP_GTID。 默认情况master_heartbeat_period为30秒,单位为纳秒。 IO Thread与SQL Thread其实就是对应handle_slave_io方法与handle_slave_sql方法。