SpringCloud微服务架构中分布式事务解决方案,一次性给你说到烂
点击上方蓝色字体,选择“标星公众号”
优质文章,第一时间送达
说明
微服务的发展
微服务落地存在的问题
单体应用拆分为分布式系统后,进程间的通讯机制和故障处理措施变的更加复杂。
系统微服务化后,一个看似简单的功能,内部可能需要调用多个服务并操作多个数据库实现,服务调用的分布式事务问题变的非常突出。
微服务数量众多,其测试、部署、监控等都变的更加困难。
ACID
原子性(Atomicity): 一个事务的所有系列操作步骤被看成是一个动作,所有的步骤要么全部完成要么一个也不会完成,如果事务过程中任何一点失败,将要被改变的数据库记录就不会被真正被改变。
一致性(Consistency): 数据库的约束 级联和触发机制Trigger都必须满足事务的一致性。也就是说,通过各种途径包括外键约束等任何写入数据库的数据都是有效的,不能发生表与表之间存在外键约束,但是有数据却违背这种约束性。所有改变数据库数据的动作事务必须完成,没有事务会创建一个无效数据状态,这是不同于CAP理论的一致性"consistency".
隔离性(Isolation): 主要用于实现并发控制, 隔离能够确保并发执行的事务能够顺序一个接一个执行,通过隔离,一个未完成事务不会影响另外一个未完成事务。
持久性(Durability): 一旦一个事务被提交,它应该持久保存,不会因为和其他操作冲突而取消这个事务。很多人认为这意味着事务是持久在磁盘上,但是规范没有特别定义这点。
一致性理论
CAP 理论
一致性:分布式环境下,多个节点的数据是否强一致。
可用性:分布式服务能一直保证可用状态。当用户发出一个请求后,服务能在有限时间内返回结果。
分区容忍性:特指对网络分区的容忍性。
BASE 理论
基本可用( Basically Available):指分布式系统在出现故障时,允许损失部分的可用性来保证核心可用;
软状态( Soft state):指允许分布式系统存在中间状态,该中间状态不会影响到系统的整体可用性;
最终一致性( Eventual consistency):指分布式系统中的所有副本数据经过一定时间后,最终能够达到一致的状态;
原子性(A)与持久性(D)必须根本保障;
为了可用性、性能与降级服务的需要,只有降低一致性( C ) 与 隔离性( I ) 的要求;
酸碱平衡(ACID-BASE Balance);
一致性模型
强一致性:数据更新成功后,任意时刻所有副本中的数据都是一致的,一般采用同步的方式实现。
弱一致性:数据更新成功后,系统不承诺立即可以读到最新写入的值,也不承诺具体多久之后可以读到。
最终一致性:弱一致性的一种形式,数据更新成功后,系统不承诺立即可以返回最新写入的值,但是保证最终会返回上一次更新操作的值。
本地事务
在单个数据库的本地并且限制在单个进程内的事务
本地事务不涉及多个数据来源
分布式事务典型方案
两阶段提交(2PC, Two Phase Commit)方案;
本地消息表 (eBay 事件队列方案);
TCC 补偿模式;
两
阶段型 补偿型
异步确保型
最大努力通知型
可查询操作
幂等操作
TCC操作
可补偿操作
两阶段提交2PC(强一致性)
第一阶段是表决阶段,所有参与者都将本事务能否成功的信息反馈发给协调者;
第二阶段是执行阶段,协调者根据所有参与者的反馈,通知所有参与者,步调一致地在所有分支上提交或者回滚;
单点问题:事务管理器在整个流程中扮演的角色很关键,如果其宕机,比如在第一阶段已经完成,在第二阶段正准备提交的时候事务管理器宕机,资源管理器就会一直阻塞,导致数据库无法使用。
同步阻塞:在准备就绪之后,资源管理器中的资源一直处于阻塞,直到提交完成,释放资源。
数据不一致:两阶段提交协议虽然为分布式数据强一致性所设计,但仍然存在数据不一致性的可能。比如:在第二阶段中,假设协调者发出了事务 Commit 的通知,但是因为网络问题该通知仅被一部分参与者所收到并执行了 Commit 操作,其余的参与者则因为没有收到通知一直处于阻塞状态,这时候就产生了数据的不一致性。
本地消息表(最终一致性)
在分布式事务操作的一方完成写业务数据的操作之后向本地消息表发送一个消息,本地事务能保证这个消息一定会被写入本地消息表中;
之后将本地消息表中的消息转发到 Kafka 等消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发;
消息消费方处理这个消息,并完成自己的业务逻辑。此时如果本地事务处理成功,表明已经处理成功了,如果处理失败,那么就会重试执行。如果是业务上面的失败,可以给生产方发送一个业务补偿消息,通知生产方进行回滚等操作;
可靠消息的最终一致性代码示例
DROP TABLE IF EXISTS `rp_transaction_message`;
CREATE TABLE `rp_transaction_message` (
`id` VARCHAR (50) NOT NULL DEFAULT '' COMMENT '主键ID',
`version` INT (11) NOT NULL DEFAULT '0' COMMENT '版本号',
`editor` VARCHAR (100) DEFAULT NULL COMMENT '修改者',
`creater` VARCHAR (100) DEFAULT NULL COMMENT '创建者',
`edit_time` datetime DEFAULT NULL COMMENT '最后修改时间',
`create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',
`message_id` VARCHAR (50) NOT NULL DEFAULT '' COMMENT '消息ID',
`message_body` LONGTEXT NOT NULL COMMENT '消息内容',
`message_data_type` VARCHAR (50) DEFAULT NULL COMMENT '消息数据类型',
`consumer_queue` VARCHAR (100) NOT NULL DEFAULT '' COMMENT '消费队列',
`message_send_times` SMALLINT (6) NOT NULL DEFAULT '0' COMMENT '消息重发次数',
`areadly_dead` VARCHAR (20) NOT NULL DEFAULT '' COMMENT '是否死亡',
`status` VARCHAR (20) NOT NULL DEFAULT '' COMMENT '状态',
`remark` VARCHAR (200) DEFAULT NULL COMMENT '备注',
`field1` VARCHAR (200) DEFAULT NULL COMMENT '扩展字段1',
`field2` VARCHAR (200) DEFAULT NULL COMMENT '扩展字段2',
`field3` VARCHAR (200) DEFAULT NULL COMMENT '扩展字段3',
PRIMARY KEY (`id`),
KEY `AK_Key_2` (`message_id`)
) ENGINE = INNODB DEFAULT CHARSET = utf8;
public interface RpTransactionMessageService {
/**
* 预存储消息.
*/
public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 确认并发送消息.
*/
public void confirmAndSendMessage(String messageId) throws MessageBizException;
/**
* 存储并发送消息.
*/
public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 直接发送消息.
*/
public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 重发消息.
*/
public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 根据messageId重发某条消息.
*/
public void reSendMessageByMessageId(String messageId) throws MessageBizException;
/**
* 将消息标记为死亡消息.
*/
public void setMessageToAreadlyDead(String messageId) throws MessageBizException;
/**
* 根据消息ID获取消息
*/
public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;
/**
* 根据消息ID删除消息
*/
public void deleteMessageByMessageId(String messageId) throws MessageBizException;
/**
* 重发某个消息队列中的全部已死亡的消息.
*/
public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException;
/**
* 获取分页数据
*/
PageBean listPage(PageParam pageParam, Map paramMap) throws MessageBizException;
}
@Service("rpTransactionMessageService")
public class RpTransactionMessageServiceImpl implements RpTransactionMessageService {
private static final Log log = LogFactory.getLog(RpTransactionMessageServiceImpl.class);
@Autowired
private RpTransactionMessageDao rpTransactionMessageDao;
@Autowired
private JmsTemplate notifyJmsTemplate;
public int saveMessageWaitingConfirm(RpTransactionMessage message) {
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
}
if (StringUtil.isEmpty(message.getConsumerQueue())) {
throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");
}
message.setEditTime(new Date());
message.setStatus(MessageStatusEnum.WAITING_CONFIRM.name());
message.setAreadlyDead(PublicEnum.NO.name());
message.setMessageSendTimes(0);
return rpTransactionMessageDao.insert(message);
}
public void confirmAndSendMessage(String messageId) {
final RpTransactionMessage message = getMessageByMessageId(messageId);
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空");
}
message.setStatus(MessageStatusEnum.SENDING.name());
message.setEditTime(new Date());
rpTransactionMessageDao.update(message);
notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.getMessageBody());
}
});
}
public int saveAndSendMessage(final RpTransactionMessage message) {
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
}
if (StringUtil.isEmpty(message.getConsumerQueue())) {
throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");
}
message.setStatus(MessageStatusEnum.SENDING.name());
message.setAreadlyDead(PublicEnum.NO.name());
message.setMessageSendTimes(0);
message.setEditTime(new Date());
int result = rpTransactionMessageDao.insert(message);
notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.getMessageBody());
}
});
return result;
}
public void directSendMessage(final RpTransactionMessage message) {
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
}
if (StringUtil.isEmpty(message.getConsumerQueue())) {
throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");
}
notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.getMessageBody());
}
});
}
public void reSendMessage(final RpTransactionMessage message) {
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
}
if (StringUtil.isEmpty(message.getConsumerQueue())) {
throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");
}
message.addSendTimes();
message.setEditTime(new Date());
rpTransactionMessageDao.update(message);
notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.getMessageBody());
}
});
}
public void reSendMessageByMessageId(String messageId) {
final RpTransactionMessage message = getMessageByMessageId(messageId);
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空");
}
int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));
if (message.getMessageSendTimes() >= maxTimes) {
message.setAreadlyDead(PublicEnum.YES.name());
}
message.setEditTime(new Date());
message.setMessageSendTimes(message.getMessageSendTimes() + 1);
rpTransactionMessageDao.update(message);
notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.getMessageBody());
}
});
}
public void setMessageToAreadlyDead(String messageId) {
RpTransactionMessage message = getMessageByMessageId(messageId);
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空");
}
message.setAreadlyDead(PublicEnum.YES.name());
message.setEditTime(new Date());
rpTransactionMessageDao.update(message);
}
public RpTransactionMessage getMessageByMessageId(String messageId) {
Map paramMap = new HashMap();
paramMap.put("messageId", messageId);
return rpTransactionMessageDao.getBy(paramMap);
}
public void deleteMessageByMessageId(String messageId) {
Map paramMap = new HashMap();
paramMap.put("messageId", messageId);
rpTransactionMessageDao.delete(paramMap);
}
@SuppressWarnings("unchecked")
public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) {
log.info("==>reSendAllDeadMessageByQueueName");
int numPerPage = 1000;
if (batchSize > 0 && batchSize < 100) {
numPerPage = 100;
} else if (batchSize > 100 && batchSize < 5000) {
numPerPage = batchSize;
} else if (batchSize > 5000) {
numPerPage = 5000;
} else {
numPerPage = 1000;
}
int pageNum = 1;
Map paramMap = new HashMap();
paramMap.put("consumerQueue", queueName);
paramMap.put("areadlyDead", PublicEnum.YES.name());
paramMap.put("listPageSortType", "ASC");
Map messageMap = new HashMap();
List
与常规MQ的ACK机制对比
Producer生成消息并发送给MQ(同步、异步);
MQ接收消息并将消息数据持久化到消息存储(持久化操作为可选配置);
MQ向Producer返回消息的接收结果(返回值、异常);
Consumer监听并消费MQ中的消息;
Consumer获取到消息后执行业务处理;
Consumer对已成功消费的消息向MQ进行ACK确认(确认后的消息将从MQ中删除);
public void test1(){
//1 数据库操作
//2 发送MQ消息
}
public void test1(){
//1 发送MQ消息
//2 数据库操作
}
@Transactional
public void test1(){
//1 发送MQ消息
//2 数据库操作
}
@Transactional
public void test1(){
//1 数据库操作
//2 发送MQ消息
}
常规MQ队列消息的处理流程无法实现消息发送一致性;
投递消息的流程其实就是消息的消费流程,可细化;
TCC (Try-Confirm-Cancel)补偿模式(最终一致性)
Try 阶段主要是对业务系统做检测及资源预留
Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行 Confirm阶段时,默认 Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。
首先在 Try 阶段,要先调用远程接口把 Smith 和 Bob 的钱
给 冻结起来。在 Confirm 阶段,执行远程调用的转账的操作,转账成功进行解冻。
如果第2步执行成功,那么转账成功,如果第二步执行失败,则调用远程冻结接口对应的解冻方法 (Cancel)。
可靠消息最终一致(常用)
A系统先发送一个prepared消息到mq,如果这个prepared消息发送失败那么就直接取消操作别执行了
如果这个消息发送成功过了,那么接着执行本地事务,如果成功就告诉mq发送确认消息,如果失败就告诉mq回滚消息
如果发送了确认消息,那么此时B系统会接收到确认消息,然后执行本地的事务
mq会自动定时
轮询 所有prepared消息回调你的接口,问你,这个消息是不是本地事务处理失败了,所有没发送确认消息?那是继续重试还是回滚?一般来说这里你就可以查下数据库看之前本地事务是否执行,如果回滚了,那么这里也回滚吧。这个就是避免可能本地事务执行成功了,别确认消息发送失败了。
最大努力通知
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:
https://blog.csdn.net/QAQFyl/article/details/113727579
锋哥最新SpringCloud分布式电商秒杀课程发布
👇👇👇
👆长按上方微信二维码 2 秒
感谢点赞支持下哈