【Mysql源码分析】基于行的复制实现之“主从关系建立”

共 14172字,需浏览 29分钟

 ·

2020-10-20 17:32

作者:c rain
来源:SegmentFault 思否社区




前言


经常听到别人说Mysql的SBR、RBR、MBR,如果不清楚,那么可以跟着文章一起来学习。由于涉及到主从的内容比较多,需要拆分成多篇内容来概述,这章先从基础知识和主从关系建立开始讲起。还会出一篇文章详细讲解从主同步。


1.了解什么是SBR、RBR、MBR?

2.了解下主从配置该如何配置?

3.了解主从关系如何建立?




1.配置Mysql主从


在本文中,分为一主一从。主监听的端口为3306,从监听的端口为3309。


1.1主服务配置


master配置,配置文件my.cnf:


[mysqld]
port=3306
basedir=/usr/local/mysql8.0.20
datadir=/usr/local/mysql8.0.20/data
socket=/tmp/mysql.sock
#explicit_defaults_for_timestamp=true
lower_case_table_names=2 #表名存储为给定的大小写但是比较的时候是小写的
log_bin=mysql-bin
server_id =10


主服务启动Mysql命令:


# sudo bin/mysqld --defaults-file=/usr/local/mysql8.0.20/etc/my.cnf --user=root


客户端连接master端


# mysql --socket=/tmp/mysql.sock -u root


启动master后,需要创建一个用户,用于主从同步:


mysql> GRANT REPLICATION SLAVE ON *.* TO repl@'localhost' IDENTIFIED BY '123456';


查看主服务状态


mysql> show master status \G;


1.2 从服务配置


slave配置,配置文件salve.conf:


[mysqld]
port=3309
basedir=/usr/local/mysql8.0.20
datadir=/usr/local/mysql8.0.20/data1
socket=/tmp/mysqlslave.sock
#explicit_defaults_for_timestamp=true
lower_case_table_names=2 #表名存储为给定的大小写但是比较的时候是小写的
log_bin=mysql-bin
server_id=2
relay_log=/usr/local/mysql8.0.20/mysql-relay-bin
read_only=1     #执行模式


从服务启动Mysql命令:


# sudo bin/mysqld --defaults-file=/usr/local/mysql8.0.20/etc/salve.conf --user=root


连接Slave端:


# mysql --socket=/tmp/mysqlslave.sock -u root


Slave端主从同步配置:


mysql>
change master to master_host='localhost'
master_user='repl',
master_password='123456',
master_log_file='mysql-bin.000001',
master_log_pos=0;


Slave/Master


#查看binlog事件
mysql> SHOW BINLOG EVENTS;


如果发现主从没有同步,可以使用如下命令查看相应的状态:


#查看slave状态
mysql> show slave status;


如果遇到从库报这个错误:Got fatal error 1236 from master when reading data from binary log: 'Could not find first log file name in binary log index file'

Got fatal error 1236 from master when reading data from binary log: 'could not find next log'


此时可以操作如下三个命令进行处理:


#关闭slave
mysql> stop slave;
#重置slave
mysql> reset slave;
#启动slave
mysql> start slave;




2.MYSQL中BINLOG_FORMAT的三种模式


mysql三种复制方式:基于SQL语句的复制(statement-based replication, SBR),基于行的复制(row-based replication, RBR),混合模式复制(mixed-based replication, MBR)。对应的,binlog的格式也有三种:STATEMENT,ROW,MIXED。


2.1 STATEMENT模式(SBR)


每一条会修改数据的sql语句会记录到binlog中。优点是并不需要记录每一条sql语句和每一行的数据变化,减少了binlog日志量,节约IO,提高性能。缺点是在某些情况下会导致master-slave中的数据不一致(如sleep()函数, last_insert_id(),以及user-defined functions(udf)等会出现问题)


2.2 ROW模式(RBR)


不记录每条sql语句的上下文信息,仅需记录哪条数据被修改了,修改成什么样了。而且不会出现某些特定情况下的存储过程、或function、或trigger的调用和触发无法被正确复制的问题。缺点是会产生大量的日志,尤其是alter table的时候会让日志暴涨。


2.3 MIXED模式(MBR)


以上两种模式的混合使用,一般的复制使用STATEMENT模式保存binlog,对于STATEMENT模式无法复制的操作使用ROW模式保存binlog,MySQL会根据执行的SQL语句选择日志保存方式。


binlog复制配置在mysql的配置文件my.cnf中,可以通过一下选项配置binglog相关,配置如下:


#binlog日志格式,mysql默认采用statement,建议使用mixed
binlog_format           = MIXED    

#binlog日志文件
log-bin                 = /data/mysql/mysql-bin.log  

#binlog过期清理时间
expire_logs_days        = 7    

#binlog每个日志文件大小
max_binlog_size         = 100m                  

#binlog缓存大小
binlog_cache_size       = 4m                    

#最大binlog缓存大小
max_binlog_cache_size   = 512m


2.4 优缺点对比


对于执行的SQL语句中包含now()这样的时间函数,会在日志中产生对应的unix_timestamp()*1000的时间字符串,slave在完成同步时,取用的是sqlEvent发生的时间来保证数据的准确性。另外对于一些功能性函数slave能完成相应的数据同步,而对于上面指定的一些类似于UDF函数,导致Slave无法知晓的情况,则会采用ROW格式存储这些Binlog,以保证产生的Binlog可以供Slave完成数据同步。


现在来比较以下 SBR 和 RBR 这2种模式各自的优缺点:


SBR 的优点:


  • 技术比较成熟
  • binlog文件较小
  • binlog中包含了所有数据库更改信息,可以据此来审核数据库的安全等情况
  • binlog可以用于实时的还原,而不仅仅用于复制
  • 主从版本可以不一样,从服务器版本可以比主服务器版本高

SBR 的缺点:


  • 不是所有的UPDATE语句都能被复制,尤其是包含不确定操作的时候。
  • 调用具有不确定因素的 UDF 时复制也可能出问题
  • 使用以下函数的语句也无法被复制:
    • LOAD_FILE()
    • UUID()
    • USER()
    • FOUND_ROWS()
    • SYSDATE() (除非启动时启用了 --sysdate-is-now 选项)

  • INSERT ... SELECT 会产生比 RBR 更多的行级锁

  • 复制需要进行全表扫描(WHERE 语句中没有使用到索引)的 UPDATE 时,需要比 RBR 请求更多的行级锁

  • 对于有 AUTO_INCREMENT 字段的 InnoDB表而言,INSERT 语句会阻塞其他 INSERT 语句

  • 对于一些复杂的语句,在从服务器上的耗资源情况会更严重,而 RBR 模式下,只会对那个发生变化的记录产生影响

  • 存储函数(不是存储过程)在被调用的同时也会执行一次 NOW() 函数,这个可以说是坏事也可能是好事

  • 确定了的 UDF 也需要在从服务器上执行

  • 数据表必须几乎和主服务器保持一致才行,否则可能会导致复制出错

  • 执行复杂语句如果出错的话,会消耗更多资源


RBR 的优点:


任何情况都可以被复制,这对复制来说是最安全可靠的和其他大多数数据库系统的复制技术一样多数情况下,从服务器上的表如果有主键的话,复制就会快了很多。复制以下几种语句时的行锁更少:

  • INSERT ... SELECT
  • 包含 AUTO_INCREMENT 字段的 INSERT
  • 没有附带条件或者并没有修改很多记录的 UPDATE 或 DELETE 语句执行 INSERT,UPDATE,DELETE 语句时锁更少,从服务器上采用多线程来执行复制成为可能。

RBR 的缺点:


  • binlog 大了很多
  • 复杂的回滚时 binlog 中会包含大量的数据

主服务器上执行 UPDATE 语句时,所有发生变化的记录都会写到 binlog 中,而 SBR 只会写一次,这会导致频繁发生 binlog 的并发写问题

  • UDF 产生的大 BLOB 值会导致复制变慢

无法从 binlog 中看到都复制了写什么语句
当在非事务表上执行一段堆积的SQL语句时,最好采用 SBR 模式,否则很容易导致主从服务器的数据不一致情况发生。

另外,针对系统库 mysql 里面的表发生变化时的处理规则如下:

  • 如果是采用 INSERT,UPDATE,DELETE 直接操作表的情况,则日志格式根据 binlog_format 的设定而记录

  • 如果是采用 GRANT,REVOKE,SET PASSWORD 等管理语句来做的话,那么无论如何都采用 SBR 模式记录


    注:采用 RBR 模式后,能解决很多原先出现的主键重复问题。



2.5 如何查看复制格式


通过如下命令即可查看复制格式,如图2-4-1:

mysql> show variables like 'binlog_format';



图2-4-1 查看复制格式

默认binlog_format参数为行复制,在源码mysql-8.0.20/sql/sys_vars.cc中

static Sys_var_enum Sys_binlog_format(
    "binlog_format",
    "What form of binary logging the master will "
    "use: either ROW for row-based binary logging, STATEMENT "
    "for statement-based binary logging, or MIXED. MIXED is statement-"
    "based binary logging except for those statements where only row-"
    "based is correct: those which involve user-defined functions (i.e. "
    "UDFs) or the UUID() function; for those, row-based binary logging is "
    "automatically used. If NDBCLUSTER is enabled and binlog-format is "
    "MIXED, the format switches to row-based and back implicitly per each "
    "query accessing an NDBCLUSTER table",
    SESSION_VAR(binlog_format), CMD_LINE(REQUIRED_ARG, OPT_BINLOG_FORMAT),
    binlog_format_names, 
    DEFAULT(BINLOG_FORMAT_ROW), //默认binlog的同步为行复制
    NO_MUTEX_GUARD,
    NOT_IN_BINLOG, ON_CHECK(binlog_format_check),
    ON_UPDATE(fix_binlog_format_after_update));

通过上面代码可以看出mysql-8.0.20默认为行复制。

那么继续来看一下binlog_format都有那些模式,可以看mysql-8.0.20/sql/system_variables.h文件

// Values for binlog_format sysvar
enum enum_binlog_format {
  BINLOG_FORMAT_MIXED = 0,  ///<混合模式 statement if safe, otherwise row - autodetected
  BINLOG_FORMAT_STMT = 1,   ///  BINLOG_FORMAT_ROW = 2,    ///<行复制 row-based
  BINLOG_FORMAT_UNSPEC =
      3  ///< thd_binlog_format() returns it when binlog is closed
};

基于如下代码可以得知,binlog_format包含BINLOG_FORMAT_MIXED、BINLOG_FORMAT_STMT、BINLOG_FORMAT_ROW三种模式,也就是对应:STATEMENT模式(SBR)、ROW模式(RBR)、MIXED模式(MBR)



3.建立主从关系


通过张图来看一下主从关系建立的相关流程,如图3-1-1:


图3-1-1 主从同步建立


3.1 slave端start_salve方法


在mysql主从建立中,是由slave端先发起,当执行“start slave;” 语句时,会调用rpl_slave.cc中的start_slave方法,其中实现如下:

bool start_slave(THD *thd, LEX_SLAVE_CONNECTION *connection_param,
                 LEX_MASTER_INFO *master_param, int thread_mask_input,
                 Master_info *mi, bool set_mts_settings) {
  bool is_error = false;
  int thread_mask;

  DBUG_TRACE;
    
  lock_slave_threads(mi);  //停止运行线程
  // 获取已停止线程的掩码
  init_thread_mask(&thread_mask, mi, true /* inverse */);
  /*
    我们将停止下面的所有线程。但如果用户想只启动一个线程,
    就好像另一个线程正在运行一样(就像我们不要想碰另一根线),
    所以将位设置为0其他线程
  */
  if (thread_mask_input) {
    thread_mask &= thread_mask_input;
  }
  if (thread_mask)  // 一些线程停止,启动它们
  {
    if (load_mi_and_rli_from_repositories(mi, false, thread_mask)) {
      is_error = true;
      my_error(ER_MASTER_INFO, MYF(0));
    } else if (*mi->host || !(thread_mask & SLAVE_IO)) {
      /*
        如果我们要启动IO线程,我们需要考虑通过启动从机提供的选项。
      */
      if (thread_mask & SLAVE_IO) {
        if (connection_param->user) { //设置用户
          mi->set_start_user_configured(true);
          mi->set_user(connection_param->user);
        }
        if (connection_param->password) { //设置密码
          mi->set_start_user_configured(true);
          mi->set_password(connection_param->password);
        }
        if (connection_param->plugin_auth) //设置授权插件
          mi->set_plugin_auth(connection_param->plugin_auth);
        if (connection_param->plugin_dir) //插件目录
          mi->set_plugin_dir(connection_param->plugin_dir);
      }

        //...

        //初始化设置
        int slave_errno = mi->rli->init_until_option(thd, master_param);
        if (slave_errno) {
          my_error(slave_errno, MYF(0));
          is_error = true;
        }

        if (!is_error) is_error = check_slave_sql_config_conflict(mi->rli);
      } else if (master_param->pos || master_param->relay_log_pos ||
                 master_param->gtid)
        push_warning(thd, Sql_condition::SL_NOTE, ER_UNTIL_COND_IGNORED,
                     ER_THD(thd, ER_UNTIL_COND_IGNORED));

      if (!is_error)
        //启动slave线程
        is_error =
            start_slave_threads(false /*need_lock_slave=false*/,
                                true /*wait_for_start=true*/, mi, thread_mask);
    } else {
      is_error = true;
      my_error(ER_BAD_SLAVE, MYF(0));
    }
  } else {
    /* 如果所有线程都已启动,则没有错误,只有一个警告 */
    push_warning_printf(
        thd, Sql_condition::SL_NOTE, ER_SLAVE_CHANNEL_WAS_RUNNING,
        ER_THD(thd, ER_SLAVE_CHANNEL_WAS_RUNNING), mi->get_channel());
  }

  /*
    如果有人试图启动,请清除启动信息,IO线程以避免任何安全问题。
  */
  if (is_error && (thread_mask & SLAVE_IO) == SLAVE_IO) mi->reset_start_info();

  unlock_slave_threads(mi);

  mi->channel_unlock();

  return is_error;
}

从如上代码可以得知调用“start slave;”时,会对线程做一些停止操作。然后进行一些设置后,调用start_slave_threads方法启动slave线程。然后start_slave_threads是一个比较关键的方法。

那么接下来看一下start_slave_threads方法,实现如下:

bool start_slave_threads(bool need_lock_slave, bool wait_for_start,
                         Master_info *mi, int thread_mask) {
  mysql_mutex_t *lock_io = nullptr, *lock_sql = nullptr,
                *lock_cond_io = nullptr, *lock_cond_sql = nullptr;
  mysql_cond_t *cond_io = nullptr, *cond_sql = nullptr;
  bool is_error = false;
  DBUG_TRACE;
  DBUG_EXECUTE_IF("uninitialized_master-info_structure", mi->inited = false;);

  //...
  
  if (thread_mask & SLAVE_IO)  //判断是否支持SLAVE_IO
    is_error = start_slave_thread(
#ifdef HAVE_PSI_THREAD_INTERFACE
        key_thread_slave_io,
#endif
        handle_slave_io, lock_io, lock_cond_io, cond_io, &mi->slave_running,
        &mi->slave_run_id, mi);  //调用handle_slave_io方法
        
  if (!is_error && (thread_mask & SLAVE_SQL)) { //判断是否支持SLAVE_SQL
   
    //...
    if (!is_error)
      is_error = start_slave_thread(
#ifdef HAVE_PSI_THREAD_INTERFACE
          key_thread_slave_sql,
#endif
          handle_slave_sql, lock_sql, lock_cond_sql, cond_sql,
          &mi->rli->slave_running, &mi->rli->slave_run_id, mi); //调用handle_slave_sql方法
    if (is_error)
      terminate_slave_threads(mi, thread_mask & SLAVE_IO,
                              rpl_stop_slave_timeout, need_lock_slave);
  }
  return is_error;
}

通过如上方法可以得知thread_mask掩码是用于判断是否支持SLAVE_IO与支持SLAVE_SQL。如果支持则线程调用对应的方法。因为考虑到主题为主从关系建立,这里主要关注一下handle_slave_io方法。

图3-1-2 主从同步协议


如图3-1-2中IO Thread与SQL Thread其实就是对应handle_slave_io方法与handle_slave_sql方法。

3.2 slave端handle_slave_io方法


handle_slave_io方法为建立主从的主要方法,其中包含了初始化slave线程、发起连接master、注册slave到master,发起COM_BINLOG_DUMP或COM_BINLOG_DUMP_GTID操作等,实现如下:

extern "C" void *handle_slave_io(void *arg) {
  //...
  my_thread_init();
  {
    //初始化slave线程
    if (init_slave_thread(thd, SLAVE_THD_IO)) {
      mysql_cond_broadcast(&mi->start_cond);
      mysql_mutex_unlock(&mi->run_lock);
      mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
                 ER_THD(thd, ER_SLAVE_FATAL_ERROR),
                 "Failed during slave I/O thread initialization ");
      goto err;
    }

    //...
    
    mysql_cond_broadcast(&mi->start_cond); //调用做唤醒操作

    
    //...
    
    //发起登陆操作
    successfully_connected = !safe_connect(thd, mysql, mi);
    // we can get killed during safe_connect
#ifdef HAVE_SETNS
    if (mi->is_set_network_namespace()) {
      // Restore original network namespace used to be before connection has
      // been created
      successfully_connected =
          restore_original_network_namespace() | successfully_connected;
    }
#endif

    //...

    /*
      注册slave到master
    */
    THD_STAGE_INFO(thd, stage_registering_slave_on_master);
    if (register_slave_on_master(mysql, mi, &suppress_warnings)) {
      if (!check_io_slave_killed(thd, mi,
                                 "Slave I/O thread killed "
                                 "while registering slave on master")) {
        LogErr(ERROR_LEVEL, ER_RPL_SLAVE_IO_THREAD_CANT_REGISTER_ON_MASTER);
        if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
                             reconnect_messages[SLAVE_RECON_ACT_REG]))
          goto err;
      } else
        goto err;
      goto connected;
    }

    //...
    
    while (!io_slave_killed(thd, mi)) {
      MYSQL_RPL rpl;

      THD_STAGE_INFO(thd, stage_requesting_binlog_dump);
      if (request_dump(thd, mysql, &rpl, mi, &suppress_warnings)) { //发起dump指令
        LogErr(ERROR_LEVEL, ER_RPL_SLAVE_ERROR_REQUESTING_BINLOG_DUMP,
               mi->get_for_channel_str());
        if (check_io_slave_killed(thd, mi,
                                  "Slave I/O thread killed while \
requesting master dump"
) ||
            try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
                             reconnect_messages[SLAVE_RECON_ACT_DUMP]))
          goto err;
        goto connected;
      }
      //...
    }
  
  //...
  
  my_thread_end(); //线程结束
#if OPENSSL_VERSION_NUMBER < 0x10100000L
  ERR_remove_thread_state(0);
#endif /* OPENSSL_VERSION_NUMBER < 0x10100000L */
  my_thread_exit(nullptr); //退出线程
  return (nullptr);  // Avoid compiler warnings
}

从代码中可以得知调用safe_connect进行slave对master的登陆,具体登陆可以协议可以看一下之前写的文章:
https://blog.csdn.net/byxiaoyuonly/article/details/108212013。

然后又调用register_slave_on_master会发送COM_REGISTER_SLAVE指令进行把slave注册到master,再调用request_dump发起binlog_dump指令。

图3-2-1 发送COM_BINLOG_DUMP


在request_dump发送指令其实支持COM_BINLOG_DUMP与COM_BINLOG_DUMP_GTID两种,但是具体发什么取决于是否开启gtid设置,如图3-2-1所示。

3.4 master建立实现


通过对上面内容的了解,我们得知register_slave_on_master会发起发起COM_REGISTER_SLAVE对把slave注册到master,然后调用request_dump发起binlog_dump指令。master指令处理如下:

bool dispatch_command(THD *thd, const COM_DATA *com_data,
                      enum enum_server_command command) {
  
  //...
  
  switch (command) {
    
    case COM_REGISTER_SLAVE: { //注册slave到master
      // TODO: access of protocol_classic should be removed
      if (!register_slave(thd, thd->get_protocol_classic()->get_raw_packet(),
                          thd->get_protocol_classic()->get_packet_length()))
        my_ok(thd);
      break;
    }
    
    //...
    
    case COM_BINLOG_DUMP_GTID: //binlog_dump_gtid
      // TODO: access of protocol_classic should be removed
      error = com_binlog_dump_gtid(
          thd, (char *)thd->get_protocol_classic()->get_raw_packet(),
          thd->get_protocol_classic()->get_packet_length());
      break;
    case COM_BINLOG_DUMP: //binlog_dump
      // TODO: access of protocol_classic should be removed
      error = com_binlog_dump(
          thd, (char *)thd->get_protocol_classic()->get_raw_packet(),
          thd->get_protocol_classic()->get_packet_length());
      break;
  }

  return error;
}

注册slave到master过程可以看后续的数据包,这边可以接着看一下对应的com_binlog_dump方法实现:

bool com_binlog_dump(THD *thd, char *packet, size_t packet_length) {
  DBUG_TRACE;
  ulong pos;
  ushort flags = 0;
  const uchar *packet_position = (uchar *)packet;
  size_t packet_bytes_todo = packet_length;

  DBUG_ASSERT(!thd->status_var_aggregated);
  thd->status_var.com_other++;
  thd->enable_slow_log = opt_log_slow_admin_statements;
  if (check_global_access(thd, REPL_SLAVE_ACL)) return false;

  /*
    4 bytes is too little, but changing the protocol would break
    compatibility.  This has been fixed in the new protocol. @see
    com_binlog_dump_gtid().
  */
  READ_INT(pos, 4);
  READ_INT(flags, 2);
  READ_INT(thd->server_id, 4);

  DBUG_PRINT("info",
             ("pos=%lu flags=%d server_id=%d", pos, flags, thd->server_id));

  kill_zombie_dump_threads(thd);

  query_logger.general_log_print(thd, thd->get_command(), "Log: '%s'  Pos: %ld",
                                 packet + 10, (long)pos);
  mysql_binlog_send(thd, thd->mem_strdup(packet + 10), (my_off_t)pos, nullptr,
                    flags); //发送binlog

  unregister_slave(thd, truetrue /*need_lock_slave_list=true*/);
  /* 如果我们到了这里,线程需要终止 */
  return true;

error_malformed_packet:
  my_error(ER_MALFORMED_PACKET, MYF(0));
  return true;
}

在mysql_binlog_send中其实调用Binlog_sender的run方法,sender.run()方法中又调用Binlog_sender::init 初始化检测、Binlog_sender::check_start_file() 检查文件等。最终调用Binlog_sender::send_binlog对从服务发送binlog,如图3-4-1所示。


图3-4-1 send binlog



4.主从建立过程中数据包


图4-1 主从建立过程数据包


通过图4-1可以得知在主从关系建立会发起如下操作:

#查询当前时间戳
SELECT UNIX_TIMESTAMP()

#查询master的serverid
SELECT @@GLOBAL.SERVER_ID

#设置心跳周期,单位为纳秒,其实只有30s。初始化心跳如图4-2
SET @master_heartbeat_period= 30000001024

#设置master_binlog_checksum
SET @master_binlog_checksum= @@global.binlog_checksum

#查询master_binlog_checksum
SELECT @master_binlog_checksum

#获得是否支持gtid
SELECT @@GLOBAL.GTID_MODE

#查询server uuid
SELECT @@GLOBAL.SERVER_UUID

#设置slave uuid
SET @slave_uuid= '2dc27df4-e143-11ea-b396-cc679ee1902b'


图4-2 初始化心跳周期




总结


  1. mysql复制模式分为三种:STATEMENT模式(SBR)、ROW模式(RBR)、MIXED模式(MBR)。
  2. mysql8.0.20默认ROW模式(RBR)。
  3. 发送binlog支持两种形式:COM_BINLOG_DUMP与COM_BINLOG_DUMP_GTID。
  4. 默认情况master_heartbeat_period为30秒,单位为纳秒。
  5. IO Thread与SQL Thread其实就是对应handle_slave_io方法与handle_slave_sql方法。



- END -

浏览 13
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报