牛逼!用 MySQL 实现一个分布式锁,这也太强了。。。

共 18389字,需浏览 37分钟

 ·

2021-09-30 23:01

上一篇:再见!LayUI !

以前参加过一个库存系统,由于其业务复杂性,搞了很多个应用来支撑。这样的话一份库存数据就有可能同时有多个应用来修改库存数据。

比如说,有定时任务域xx.cron,和SystemA域和SystemB域这几个JAVA应用,可能同时修改同一份库存数据。如果不做协调的话,就会有脏数据出现。

对于跨JAVA进程的线程协调,可以借助外部环境,例如DB或者Redis。

下文介绍一下如何使用DB来实现分布式锁。

设计

本文设计的分布式锁的交互方式如下:

1、根据业务字段生成transaction_id,并线程安全的创建锁资源

2、根据transaction_id申请锁

3、释放锁

动态创建锁资源

在使用synchronized关键字的时候,必须指定一个锁对象。

synchronized(obj) {
...

进程内的线程可以基于obj来实现同步。obj在这里可以理解为一个锁对象。如果线程要进入synchronized代码块里,必须先持有obj对象上的锁。这种锁是JAVA里面的内置锁,创建的过程是线程安全的。

那么借助DB,如何保证创建锁的过程是线程安全的呢?可以利用DB中的UNIQUE KEY特性,一旦出现了重复的key,由于UNIQUE KEY的唯一性,会抛出异常的。

在JAVA里面,是SQLIntegrityConstraintViolationException异常。

create table distributed_lock
(
 id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT '自增主键',
 transaction_id varchar(128) NOT NULL DEFAULT '' COMMENT '事务id',
 last_update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP NOT NULL COMMENT '最后更新时间',
 create_time TIMESTAMP DEFAULT '0000-00-00 00:00:00' NOT NULL COMMENT '创建时间',
 UNIQUE KEY `idx_transaction_id` (`transaction_id`)
)

transaction_id是事务Id,比如说,可以用

仓库 + 条码 + 销售模式

来组装一个transaction_id,表示某仓库某销售模式下的某个条码资源。不同条码,当然就有不同的transaction_id。如果有两个应用,拿着相同的transaction_id来创建锁资源的时候,只能有一个应用创建成功。

一条distributed_lock记录插入成功了,就表示一份锁资源创建成功了。另外,关注公众号互联网架构师,在后台回复:2,可以获取我整理的 Java 系列面试题和答案,非常齐全。

DB连接池列表设计

在写操作频繁的业务系统中,通常会进行分库,以降低单数据库写入的压力,并提高写操作的吞吐量。最新 Java 核心技术教程,都在这了

如果使用了分库,那么业务数据自然也都分配到各个数据库上了。在这种水平切分的多数据库上使用DB分布式锁,可以自定义一个DataSouce列表。并暴露一个getConnection(String transactionId)方法,按照transactionId找到对应的Connection

实现代码如下:

package dlock;

import com.alibaba.druid.pool.DruidDataSource;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.FileInputStream;
import java.io.IOException;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

@Component
public class DataSourcePool {
    private List<DruidDataSource> dlockDataSources = new ArrayList<>();

    @PostConstruct
    private void initDataSourceList() throws IOException {
        Properties properties = new Properties();
        FileInputStream fis = new FileInputStream("db.properties");
        properties.load(fis);

        Integer lockNum = Integer.valueOf(properties.getProperty("DLOCK_NUM"));
        for (int i = 0; i < lockNum; i++) {
            String user = properties.getProperty("DLOCK_USER_" + i);
            String password = properties.getProperty("DLOCK_PASS_" + i);
            Integer initSize = Integer.valueOf(properties.getProperty("DLOCK_INIT_SIZE_" + i));
            Integer maxSize = Integer.valueOf(properties.getProperty("DLOCK_MAX_SIZE_" + i));
            String url = properties.getProperty("DLOCK_URL_" + i);

            DruidDataSource dataSource = createDataSource(user,password,initSize,maxSize,url);
            dlockDataSources.add(dataSource);
        }
    }

    private DruidDataSource createDataSource(String user, String password, Integer initSize, Integer maxSize, String url) {
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        dataSource.setUsername(user);
        dataSource.setPassword(password);
        dataSource.setUrl(url);
        dataSource.setInitialSize(initSize);
        dataSource.setMaxActive(maxSize);

        return dataSource;
    }

    public Connection getConnection(String transactionId) throws Exception {
        if (dlockDataSources.size() <= 0) {
            return null;
        }

        if (transactionId == null || "".equals(transactionId)) {
            throw new RuntimeException("transactionId是必须的");
        }

        int hascode = transactionId.hashCode();
        if (hascode < 0) {
            hascode = - hascode;
        }

        return dlockDataSources.get(hascode % dlockDataSources.size()).getConnection();
    }
}

首先编写一个initDataSourceList方法,并利用Spring的PostConstruct注解初始化一个DataSource 列表。相关的DB配置从db.properties读取。

DLOCK_NUM=2

DLOCK_USER_0="user1"
DLOCK_PASS_0="pass1"
DLOCK_INIT_SIZE_0=2
DLOCK_MAX_SIZE_0=10
DLOCK_URL_0="jdbc:mysql://localhost:3306/test1"

DLOCK_USER_1="user1"
DLOCK_PASS_1="pass1"
DLOCK_INIT_SIZE_1=2
DLOCK_MAX_SIZE_1=10
DLOCK_URL_1="jdbc:mysql://localhost:3306/test2"

DataSource使用阿里的DruidDataSource

接着最重要的一个实现getConnection(String transactionId)方法。实现原理很简单,获取transactionId的hashcode,并对DataSource的长度取模即可。

连接池列表设计好后,就可以实现往distributed_lock表插入数据了。
package dlock;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.sql.*;

@Component
public class DistributedLock {

    @Autowired
    private DataSourcePool dataSourcePool;

    /**
     * 根据transactionId创建锁资源
     */
    public String createLock(String transactionId) throws Exception{
        if (transactionId == null) {
            throw new RuntimeException("transactionId是必须的");
        }
        Connection connection = null;
        Statement statement = null;
        try {
            connection = dataSourcePool.getConnection(transactionId);
            connection.setAutoCommit(false);
            statement = connection.createStatement();
            statement.executeUpdate("INSERT INTO distributed_lock(transaction_id) VALUES ('" + transactionId + "')");
            connection.commit();
            return transactionId;
        }
        catch (SQLIntegrityConstraintViolationException icv) {
            //说明已经生成过了。
            if (connection != null) {
                connection.rollback();
            }
            return transactionId;
        }
        catch (Exception e) {
            if (connection != null) {
                connection.rollback();
            }
            throw  e;
        }
        finally {
            if (statement != null) {
                statement.close();
            }

            if (connection != null) {
                connection.close();
            }
        }
    }
}

根据transactionId锁住线程

接下来利用DB的select for update特性来锁住线程。当多个线程根据相同的transactionId并发同时操作select for update的时候,只有一个线程能成功,其他线程都block住,直到select for update成功的线程使用commit操作后,block住的所有线程的其中一个线程才能开始干活。我们在上面的DistributedLock类中创建一个lock法。

public boolean lock(String transactionId) throws Exception {
    Connection connection = null;
    PreparedStatement preparedStatement = null;
    ResultSet resultSet = null;
    try {
        connection = dataSourcePool.getConnection(transactionId);
        preparedStatement = connection.prepareStatement("SELECT * FROM distributed_lock WHERE transaction_id = ? FOR UPDATE ");
        preparedStatement.setString(1,transactionId);
        resultSet = preparedStatement.executeQuery();
        if (!resultSet.next()) {
            connection.rollback();
            return false;
        }
        return true;
    } catch (Exception e) {
        if (connection != null) {
            connection.rollback();
        }
        throw  e;
    }
    finally {
        if (preparedStatement != null) {
            preparedStatement.close();
        }

        if (resultSet != null) {
            resultSet.close();
        }

        if (connection != null) {
            connection.close();
        }
    }
}

实现解锁操作

当线程执行完任务后,必须手动的执行解锁操作,之前被锁住的线程才能继续干活。在我们上面的实现中,其实就是获取到当时select for update成功的线程对应的Connection,并实行commit操作即可。

那么如何获取到呢?我们可以利用ThreadLocal。首先在DistributedLock类中定义

private ThreadLocal<Connection> threadLocalConn = new ThreadLocal<>();

每次调用lock方法的时候,把Connection放置到ThreadLocal里面。我们修改lock方法。

public boolean lock(String transactionId) throws Exception {
    Connection connection = null;
    PreparedStatement preparedStatement = null;
    ResultSet resultSet = null;
    try {
        connection = dataSourcePool.getConnection(transactionId);
        threadLocalConn.set(connection);
        preparedStatement = connection.prepareStatement("SELECT * FROM distributed_lock WHERE transaction_id = ? FOR UPDATE ");
        preparedStatement.setString(1,transactionId);
        resultSet = preparedStatement.executeQuery();
        if (!resultSet.next()) {
            connection.rollback();
            threadLocalConn.remove();
            return false;
        }
        return true;
    } catch (Exception e) {
        if (connection != null) {
            connection.rollback();
            threadLocalConn.remove();
        }
        throw  e;
    }
    finally {
        if (preparedStatement != null) {
            preparedStatement.close();
        }

        if (resultSet != null) {
            resultSet.close();
        }

        if (connection != null) {
            connection.close();
        }
    }
}

这样子当获取到Connection后,将其设置到ThreadLocal中,如果lock方法出现异常,则将其从ThreadLocal中移除掉。

有了这几步后,我们可以来实现解锁操作了。我们在DistributedLock添加一个unlock方法。

public void unlock() throws Exception {
    Connection connection = null;
    try {
        connection = threadLocalConn.get();
        if (!connection.isClosed()) {
            connection.commit();
            connection.close();
            threadLocalConn.remove();
        }
    } catch (Exception e) {
        if (connection != null) {
            connection.rollback();
            connection.close();
        }
        threadLocalConn.remove();
        throw e;
    }
}

缺点

毕竟是利用DB来实现分布式锁,对DB还是造成一定的压力。当时考虑使用DB做分布式的一个重要原因是,我们的应用是后端应用,平时流量不大的,反而关键的是要保证库存数据的正确性。

对于像前端库存系统,比如添加购物车占用库存等操作,最好别使用DB来实现分布式锁了。

进一步思考

如果想锁住多份数据该怎么实现?

比如说,某个库存操作,既要修改物理库存,又要修改虚拟库存,想锁住物理库存的同时,又锁住虚拟库存。

其实也不是很难,参考lock方法,写一个multiLock方法,提供多个transactionId的入参,for循环处理就可以了。

来源:https://blog.csdn.net/linsongbin1/article/details/79444274


感谢您的阅读,也欢迎您发表关于这篇文章的任何建议,关注我,技术不迷茫!小编到你上高速。

    · END ·
最后,关注公众号互联网架构师,在后台回复:2T,可以获取我整理的 Java 系列面试题和答案,非常齐全


正文结束


推荐阅读 ↓↓↓

1.不认命,从10年流水线工人,到谷歌上班的程序媛,一位湖南妹子的励志故事

2.如何才能成为优秀的架构师?

3.从零开始搭建创业公司后台技术栈

4.程序员一般可以从什么平台接私活?

5.37岁程序员被裁,120天没找到工作,无奈去小公司,结果懵了...

6.IntelliJ IDEA 2019.3 首个最新访问版本发布,新特性抢先看

7.这封“领导痛批95后下属”的邮件,句句扎心!

8.15张图看懂瞎忙和高效的区别!

一个人学习、工作很迷茫?


点击「阅读原文」加入我们的小圈子!

浏览 32
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报