分类: 编程技巧

  • Java 陷阱:慎用入参做返回值!!!

    快乐分享,Java干货及时送达👇

    来源:https://my.oschina.net/waylau/blog/4771348

    正常情况下,在Java中入参是不建议用做返回值的。除了造成代码不易理解、语义不清等问题外,可能还埋下了陷阱等你入坑。

    问题背景

    比如有这么一段代码:

    @Named
    public class AService {
    private SupplyAssignment localSupply = new SupplyAssignment();
        @Inject
        private BService bervice;

        public List calcSupplyAssignment()
           List supplyList 
    = bService.getLocalSupplyList(this.localSupply);
            …
           return supplyList;
        }
    }

    上面代码,服务A希望调用服务B,以获取supplyList,但同时,服务A又希望修改localSupply的状态值,未能避免修改calcSupplyAssignment接口的(不想改返回的类型),将localSupply作为了入参但同时也用作了返回值。

    服务B代码如下:

    @Named
    public class BService {

    public List getLocalSupplyList (SupplyAssignment localSupply)
        SupplyAssignment supplyAssignment 
    this.getSupplyAssignment();
            // 希望localSupply被重新赋值后返回
            localSupply = supplyAssignment;
            …
            return supplyList;

        }
    }

    在服务B代码内部,服务A的入参localSupply被传入,希望重新被supplyAssignment赋值而后返回新值。然而,这样做是无效的。

    问题原因

    先来看下编程语言中关于参数传递的类型:

    • 值传递(pass by value)是指在调用函数时将实际参数复制一份传递到函数中,这样在函数中如果对参数进行修改,将不会影响到实际参数。
    • 引用传递(pass by reference)是指在调用函数时将实际参数的地址直接传递到函数中,那么在函数中对参数所进行的修改,将影响到实际参数。

    因为Java程序设计语言是采用的值传递,因为Java没有指针的概念。也就是说方法得到的是所有参数值的一个拷贝,方法并不能修改传递给它的任何参数变量的内容。

    因此,上述代码中,服务A调用服务B时,服务B的参数localSupply实际上是服务A的localSupply的一个拷贝,当然,这两个都是指向了同一个地址对象supplyAssignment1。

    图片

    当在服务B内部对参数localSupply进行重新赋值是localSupply = supplyAssignment,实际上,只是对B的参数localSupply做了从新赋值,B的参数localSupply会指向一个新的地址对象supplyAssignment2。

    图片

    从上图可以清晰看到,因此,服务A的localSupply和B的参数localSupply已经指向了不同的对象了,对B的参数localSupply做任何的修改,都不会影响服务A的localSupply的原值。

    这就是问题的原因,你希望服务B来修改服务A入参的状态,并将改后的值返回给服务A,但并不奏效。

    包含一些大佬的学习资料,且配套了相关的实践案例的最强Java并发编程笔记详解,关注公众号SpringForAll社区,回复:Java,即可免费领取!

    解决方案

    方案1:入参不要用作返回值

    有时确实想要入参做返回值,那看方案2。

    方案2:入参不要赋值新对象

    这个方案就是直接在入参的对象上做状态的修改,而不要去赋值新对象。还是这个图:

    图片

    在这个图中,只要我们是一直在B的参数localSupply修改的是supplyAssignment1的状态值,那结果就能反馈到服务A的localSupply上。如何实现?看下下面代码:

    @Named
    public class BService {

        public List getLocalSupplyList (SupplyAssignment localSupply)

            SupplyAssignment supplyAssignment 
    this.getSupplyAssignment();

            // 针对localSupply不能新建引用,只能重新赋值属性
            BeanUtils.copyProperties(supplyAssignment, localSupply);
            …
            return supplyList;

        }

    }

    在上面的方法中,我们用到了Spring的工具类BeanUtils,该类的copyProperties方法的实质是将supplyAssignment的属性值,赋值到了localSupply的属性上。

    这意味着我们是修改的B的参数localSupply上的属性,而并未新建对象。

  • 九种分布式ID解决方案,总有一款适合你!

    快乐分享,Java干货及时送达👇

    • 1、UUID
    • 2、数据库自增ID
      • 2.1、主键表
      • 2.2、ID自增步长设置
    • 3、号段模式
    • 4、Redis INCR
    • 5、雪花算法
    • 6、美团(Leaf)
    • 7、百度(Uidgenerator)
    • 8、滴滴(TinyID)
    • 总结比较

    背景

    在复杂的分布式系统中,往往需要对大量的数据进行唯一标识,比如在对一个订单表进行了分库分表操作,这时候数据库的自增ID显然不能作为某个订单的唯一标识。除此之外还有其他分布式场景对分布式ID的一些要求:

    • 趋势递增: 由于多数RDBMS使用B-tree的数据结构来存储索引数据,在主键的选择上面我们应该尽量使用有序的主键保证写入性能。
    • 单调递增: 保证下一个ID一定大于上一个ID,例如排序需求。
    • 信息安全: 如果ID是连续的,恶意用户的扒取工作就非常容易做了;如果是订单号就更危险了,可以直接知道我们的单量。所以在一些应用场景下,会需要ID无规则、不规则。

    就不同的场景及要求,市面诞生了很多分布式ID解决方案。本文针对多个分布式ID解决方案进行介绍,包括其优缺点、使用场景及代码示例。

    1、UUID

    UUID(Universally Unique Identifier)是基于当前时间、计数器(counter)和硬件标识(通常为无线网卡的MAC地址)等数据计算生成的。包含32个16进制数字,以连字号分为五段,形式为8-4-4-4-12的36个字符,可以生成全球唯一的编码并且性能高效。

    JDK提供了UUID生成工具,代码如下:

    import java.util.UUID;

    public class Test {
        public static void main(String[] args) {
            System.out.println(UUID.randomUUID());
        }
    }

    输出如下

    b0378f6a-eeb7-4779-bffe-2a9f3bc76380

    UUID完全可以满足分布式唯一标识,但是在实际应用过程中一般不采用,有如下几个原因:

    • 存储成本高: UUID太长,16字节128位,通常以36长度的字符串表示,很多场景不适用。
    • 信息不安全: 基于MAC地址生成的UUID算法会暴露MAC地址,曾经梅丽莎病毒的制造者就是根据UUID寻找的。
    • 不符合MySQL主键要求: MySQL官方有明确的建议主键要尽量越短越好,因为太长对MySQL索引不利:如果作为数据库主键,在InnoDB引擎下,UUID的无序性可能会引起数据位置频繁变动,严重影响性能。

    2、数据库自增ID

    利用Mysql的特性ID自增,可以达到数据唯一标识,但是分库分表后只能保证一个表中的ID的唯一,而不能保证整体的ID唯一。为了避免这种情况,我们有以下两种方式解决该问题。

    2.1、主键表

    通过单独创建主键表维护唯一标识,作为ID的输出源可以保证整体ID的唯一。举个例子:

    创建一个主键表

    CREATE TABLE `unique_id`  (
      `id` bigint NOT NULL AUTO_INCREMENT,
      `biz` char(1NOT NULL,
      PRIMARY KEY (`id`),
     UNIQUE KEY `biz` (`biz`)
    ENGINE = InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET =utf8;

    业务通过更新操作来获取ID信息,然后添加到某个分表中。

    BEGIN;

    REPLACE INTO unique_id (biz) values ('o') ;
    SELECT LAST_INSERT_ID();

    COMMIT;

    2.2、ID自增步长设置

    我们可以设置Mysql主键自增步长,让分布在不同实例的表数据ID做到不重复,保证整体的唯一。

    如下,可以设置Mysql实例1步长为1,实例1步长为2。

    查看主键自增的属性

    show variables like '%increment%'

    显然,这种方式在并发量比较高的情况下,如何保证扩展性其实会是一个问题。

    3、号段模式

    号段模式是当下分布式ID生成器的主流实现方式之一。其原理如下:

    • 号段模式每次从数据库取出一个号段范围,加载到服务内存中。业务获取时ID直接在这个范围递增取值即可。
    • 等这批号段ID用完,再次向数据库申请新号段,对max_id字段做一次update操作,新的号段范围是(max_id ,max_id +step]。
    • 由于多业务端可能同时操作,所以采用版本号version乐观锁方式更新。

    例如 (1,1000] 代表1000个ID,具体的业务服务将本号段生成1~1000的自增ID。表结构如下:

    CREATE TABLE id_generator (
      id int(10NOT NULL,
      max_id bigint(20NOT NULL COMMENT '当前最大id',
      step int(20NOT NULL COMMENT '号段的长度',
      biz_type    int(20NOT NULL COMMENT '业务类型',
      version int(20NOT NULL COMMENT '版本号,是一个乐观锁,每次都更新version,保证并发时数据的正确性',
      PRIMARY KEY (`id`)

    这种分布式ID生成方式不强依赖于数据库,不会频繁的访问数据库,对数据库的压力小很多。但同样也会存在一些缺点比如:服务器重启,单点故障会造成ID不连续。

    4、Redis INCR

    基于全局唯一ID的特性,我们可以通过Redis的INCR命令来生成全局唯一ID。

    Redis分布式ID的简单案例

    /**
     *  Redis 分布式ID生成器
     */

    @Component
    public class RedisDistributedId {

        @Autowired
        private StringRedisTemplate redisTemplate;

        private static final long BEGIN_TIMESTAMP = 1659312000l;

        /**
         * 生成分布式ID
         * 符号位    时间戳[31位]  自增序号【32位】
         * @param item
         * @return
         */

        public long nextId(String item){
            // 1.生成时间戳
            LocalDateTime now = LocalDateTime.now();
            // 格林威治时间差
            long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
            // 我们需要获取的 时间戳 信息
            long timestamp = nowSecond - BEGIN_TIMESTAMP;
            // 2.生成序号 --》 从Redis中获取
            // 当前当前的日期
            String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
            // 获取对应的自增的序号
            Long increment = redisTemplate.opsForValue().increment("id:" + item + ":" + date);
            return timestamp 32 | increment;
        }

    }

    同样使用Redis也有对应的缺点:ID 生成的持久化问题,如果Redis宕机了怎么进行恢复?

    5、雪花算法

    Snowflake,雪花算法是有Twitter开源的分布式ID生成算法,以划分命名空间的方式将64bit位分割成了多个部分,每个部分都有具体的不同含义,在Java中64Bit位的整数是Long类型,所以在Java中Snowflake算法生成的ID就是long来存储的。具体如下:

    • 第一部分: 占用1bit,第一位为符号位,不适用
    • 第二部分: 41位的时间戳,41bit位可以表示241个数,每个数代表的是毫秒,那么雪花算法的时间年限是(241)/(1000×60×60×24×365)=69
    • 第三部分: 10bit表示是机器数,即 2^ 10 = 1024台机器,通常不会部署这么多机器
    • 第四部分: 12bit位是自增序列,可以表示2^12=4096个数,一秒内可以生成4096个ID,理论上snowflake方案的QPS约为409.6w/s

    雪花算法案例代码:

    public class SnowflakeIdWorker {

        // ==============================Fields===========================================
        /**
         * 开始时间截 (2020-11-03,一旦确定不可更改,否则时间被回调,或者改变,可能会造成id重复或冲突)
         */

        private final long twepoch = 1604374294980L;

        /**
         * 机器id所占的位数
         */

        private final long workerIdBits = 5L;

        /**
         * 数据标识id所占的位数
         */

        private final long datacenterIdBits = 5L;

        /**
         * 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
         */

        private final long maxWorkerId = -1L ^ (-1L 
        /**
         * 支持的最大数据标识id,结果是31
         */

        private final long maxDatacenterId = -1L ^ (-1L 
        /**
         * 序列在id中占的位数
         */

        private final long sequenceBits = 12L;

        /**
         * 机器ID向左移12位
         */

        private final long workerIdShift = sequenceBits;

        /**
         * 数据标识id向左移17位(12+5)
         */

        private final long datacenterIdShift = sequenceBits + workerIdBits;

        /**
         * 时间截向左移22位(5+5+12)
         */

        private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;

        /**
         * 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)
         */

        private final long sequenceMask = -1L ^ (-1L 
        /**
         * 工作机器ID(0~31)
         */

        private long workerId;

        /**
         * 数据中心ID(0~31)
         */

        private long datacenterId;

        /**
         * 毫秒内序列(0~4095)
         */

        private long sequence = 0L;

        /**
         * 上次生成ID的时间截
         */

        private long lastTimestamp = -1L;

        //==============================Constructors=====================================

        /**
         * 构造函数
         *
         */

        public SnowflakeIdWorker() {
            this.workerId = 0L;
            this.datacenterId = 0L;
        }

        /**
         * 构造函数
         *
         * @param workerId     工作ID (0~31)
         * @param datacenterId 数据中心ID (0~31)
         */

        public SnowflakeIdWorker(long workerId, long datacenterId) {
            if (workerId > maxWorkerId || workerId 0) {
                throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
            }
            if (datacenterId > maxDatacenterId || datacenterId 0) {
                throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
            }
            this.workerId = workerId;
            this.datacenterId = datacenterId;
        }

        // ==============================Methods==========================================

        /**
         * 获得下一个ID (该方法是线程安全的)
         *
         * @return SnowflakeId
         */

        public synchronized long nextId() {
            long timestamp = timeGen();

            //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
            if (timestamp             throw new RuntimeException(
                        String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
            }

            //如果是同一时间生成的,则进行毫秒内序列
            if (lastTimestamp == timestamp) {
                sequence = (sequence + 1) & sequenceMask;
                //毫秒内序列溢出
                if (sequence == 0) {
                    //阻塞到下一个毫秒,获得新的时间戳
                    timestamp = tilNextMillis(lastTimestamp);
                }
            }
            //时间戳改变,毫秒内序列重置
            else {
                sequence = 0L;
            }

            //上次生成ID的时间截
            lastTimestamp = timestamp;

            //移位并通过或运算拼到一起组成64位的ID
            return ((timestamp - twepoch) //
                    | (datacenterId //
                    | (workerId //
                    | sequence;
        }

        /**
         * 阻塞到下一个毫秒,直到获得新的时间戳
         *
         * @param lastTimestamp 上次生成ID的时间截
         * @return 当前时间戳
         */

        protected long tilNextMillis(long lastTimestamp) {
            long timestamp = timeGen();
            while (timestamp             timestamp = timeGen();
            }
            return timestamp;
        }

        /**
         * 返回以毫秒为单位的当前时间
         *
         * @return 当前时间(毫秒)
         */

        protected long timeGen() {
            return System.currentTimeMillis();
        }

        /**
         * 随机id生成,使用雪花算法
         *
         * @return
         */

        public static String getSnowId() {
            SnowflakeIdWorker sf = new SnowflakeIdWorker();
            String id = String.valueOf(sf.nextId());
            return id;
        }

        //=========================================Test=========================================

        /**
         * 测试
         */

        public static void main(String[] args) {
            SnowflakeIdWorker idWorker = new SnowflakeIdWorker(00);
            for (int i = 0; i 1000; i++) {
                long id = idWorker.nextId();
                System.out.println(id);
            }
        }
    }

    雪花算法强依赖机器时钟,如果机器上时钟回拨,会导致发号重复。通常通过记录最后使用时间处理该问题。

    6、美团(Leaf)

    由美团开发,开源项目链接:

    • https://github.com/Meituan-Dianping/Leaf

    Leaf同时支持号段模式和snowflake算法模式,可以切换使用。

    snowflake模式依赖于ZooKeeper,不同于原始snowflake算法也主要是在workId的生成上,Leaf中workId是基于ZooKeeper的顺序Id来生成的,每个应用在使用Leaf-snowflake时,启动时都会都在Zookeeper中生成一个顺序Id,相当于一台机器对应一个顺序节点,也就是一个workId。

    号段模式是对直接用数据库自增ID充当分布式ID的一种优化,减少对数据库的频率操作。相当于从数据库批量的获取自增ID,每次从数据库取出一个号段范围,例如 (1,1000] 代表1000个ID,业务服务将号段在本地生成1~1000的自增ID并加载到内存。

    7、百度(Uidgenerator)

    源码地址:

    • https://github.com/baidu/uid-generator

    中文文档地址:

    • https://github.com/baidu/uid-generator/blob/master/README.zh_cn.md

    UidGenerator是百度开源的Java语言实现,基于Snowflake算法的唯一ID生成器。它是分布式的,并克服了雪花算法的并发限制。单个实例的QPS能超过6000000。需要的环境:JDK8+,MySQL(用于分配WorkerId)。

    百度的Uidgenerator对结构做了部分的调整,具体如下:

    时间部分只有28位,这就意味着UidGenerator默认只能承受8.5年(2^28-1/86400/365),不过UidGenerator可以适当调整delta seconds、worker node id和sequence占用位数。

    8、滴滴(TinyID)

    由滴滴开发,开源项目链接:

    • https://github.com/didi/tinyid

    Tinyid是在美团(Leaf)的leaf-segment算法基础上升级而来,不仅支持了数据库多主节点模式,还提供了tinyid-client客户端的接入方式,使用起来更加方便。但和美团(Leaf)不同的是,Tinyid只支持号段一种模式不支持雪花模式。Tinyid提供了两种调用方式,一种基于Tinyid-server提供的http方式,另一种Tinyid-client客户端方式。

    总结比较

    作者:叫我二蛋

    来源:wangbinguang.blog.csdn.net/article/

    details/129201971


  • 为了减少延迟和卡顿,我对 MySQL 查询做了这些优化处理….

    前言

    在程序上线运行一段时间后,一旦数据量上去了,或多或少会感觉到系统出现延迟、卡顿等现象,出现这种问题,就需要程序员或架构师进行系统调优工作了。

    其中,大量的实践经验表明,调优的手段尽管有很多,但涉及到SQL调优的内容仍然是非常重要的一环,本文将结合实例,总结一些工作中可能涉及到的SQL优化策略;

    查询优化

    可以说,对于大多数系统来说,读多写少一定是常态,这就表示涉及到查询的SQL是非常高频的操作;

    前置准备,给一张测试表添加10万条数据

    使用下面的存储过程给单表造一批数据,将表换成自己的就好了

    create procedure addMyData()
     
     begin
     
      declare num int;
      set num =1;
      
      while num do
      
       insert into XXX_table values(
        replace(uuid(),'-',''),concat('测试',num),concat('cs',num),'123456'
       );
     
       set num =num +1;
      end while;
     
     end ;

    然后调用该存储过程

    call addMyData();

    本篇准备了3张表,分别为学生(student)表,班级(class)表,账户(account)表,各自有50万,1万和10万条数据用于测试;

    1、分页查询优化

    分页查询是开发中经常会遇到的,有一种情况是,当分页的数量非常大的时候,查询的时候往往非常耗时,比如查询student表,使用下面的sql查询,耗时达到0.2秒;

    实践经验告诉我们,越往后,分页查询效率越低,这就是分页查询的问题所在, 因为,当在进行分页查询时,如果执行 limit 400000,10 ,此时需要 MySQL 排序前4000 10 记 录,仅仅返回400000 - 4 00010 的记录,其他记录丢弃,查询排序的代价非常大

    优化思路:

    一般分页查询时,通过创建 覆盖索引 能够比较好地提高性能,可以通过覆盖索引加子查询形式进行优化;

    1) 在索引上完成排序分页操作,最后根据主键关联回原表查询所需要的其他列内容

    SELECT * FROM student t1,(SELECT id FROM student ORDER BY id LIMIT 400000,10) t2 WHERE t1.id =t2.id;

    执行上面的sql,可以看到响应时间有一定的提升;

    2)对于主键自增的表,可以把Limit 查询转换成某个位置的查询

    select * from student where id > 400000 limit 10;

    执行上面的sql,可以看到响应时间有一定的提升;

    2、关联查询优化

    在实际的业务开发过程中,关联查询可以说随处可见,关联查询的优化核心思路是,最好为关联查询的字段添加索引,这是关键,具体到不同的场景,还需要具体分析,这个跟mysql的引擎在执行优化策略的方案选择时有一定关系;

    2.1 左连接或右连接

    下面是一个使用left join 的查询,可以预想到这条sql查询的结果集非常大

    select t.* from student t left join class cs on t.classId = cs.id;

    为了检查下sql的执行效率,使用explain做一下分析,可以看到,第一张表即left join左边的表student走了全表扫描,而class表走了主键索引,尽管结果集较大,还是走了索引;

    针对这种场景的查询,思路如下:

    • 让查询的字段尽量包含在主键索引或者覆盖索引中;
    • 查询的时候尽量使用分页查询;

    关于左连接(右连接)的explain结果补充说明

    • 左连接左边的表一般为驱动表,右边的表为被驱动表;
    • 尽可能让数据集小的表作为驱动表,减少mysql内部循环的次数;
    • 两表关联时,explain结果展示中,第一栏一般为驱动表;
    2.2 关联查询关联的字段建立索引

    看下面的这条sql,其关联字段非表的主键,而是普通的字段;

    explain select u.* from tenant t left join `user` u on u.account = t.tenant_name where t.removed is null and u.removed is null;

    通过explain分析可以发现,左边的表走了全表扫描,可以考虑给左边的表的tenant_nameuser表的“ 各自创建索引;

    create index idx_name on tenant(tenant_name);

    create index idx_account on `user`(account);

    再次使用explain分析结果如下

    可以看到第二行type变为ref,rows的数量优化比较明显。这是由左连接特性决定的,LEFT JOIN条件用于确定如何从右表搜索行,左边一定都有,所以右边是我们的关键点,一定需要建立索引 。

    2.3 内连接关联的字段建立索引

    我们知道,左连接和右连接查询的数据分别是完全包含左表数据,完全包含右表数据,而内连接(inner join 或join) 则是取交集(共有的部分),在这种情况下,驱动表的选择是由mysql优化器自动选择的;

    在上面的基础上,首先移除两张表的索引

    ALTER TABLE `user` DROP INDEX idx_account;
    ALTER TABLE `tenant` DROP INDEX idx_name

    使用explain语句进行分析

    然后给user表的account字段添加索引,再次执行explain我们发现,user表竟然被当作是被驱动表了;

    此时,如果我们给tenant表的tenant_name加索引,并移除user表的account索引,得出的结果竟然都没有走索引,再次说明,使用内连接的情况下,查询优化器将会根据自己的判断进行选择;

    3、子查询优化

    子查询在日常编写业务的SQL时也是使用非常频繁的做法,不是说子查询不能用,而是当数据量超出一定的范围之后,子查询的性能下降是很明显的,关于这一点,本人在日常工作中深有体会;

    比如下面这条sql,由于student表数据量较大,执行起来耗时非常长,可以看到耗费了将近3秒;

    select st.* from student st where st.classId in (
     
     select id from class where id > 100
     
    );

    通过执行explain进行分析得知,内层查询 id > 100的子查询尽管用上了主键索引,但是由于结果集太大,带入到外层查询,即作为in的条件时,查询优化器还是走了全表扫描;

    针对上面的情况,可以考虑下面的优化方式

    select st.id from student st join class cl on st.classId = cl.id where cl.id > 100;

    子查询性能低效的原因

    • 子查询时,MySQL需要为内层查询语句的查询结果建立一个临时表 ,然后外层查询语句从临时表中查询记录,查询完毕后,再撤销这些临时表 。这样会消耗过多的CPU和IO资源,产生大量的慢查询;
    • 子查询结果集存储的临时表,不论是内存临时表还是磁盘临时表都不能走索引 ,所以查询性能会受到一定的影响;
    • 对于返回结果集比较大的子查询,其对查询性能的影响也就越大;

    使用mysql查询时,可以使用连接(JOIN)查询来替代子查询。连接查询不需要建立临时表 ,其速度比子查询要快 ,如果查询中使用索引的话,性能就会更好,尽量不要使用NOT IN 或者 NOT EXISTS,用LEFT JOIN xxx ON xx WHERE xx IS NULL替代;

    一个真实的案例

    在下面的这段sql中,优化前使用的是子查询,在一次生产问题的性能分析中,发现某个tenant_id下的数据达到了35万多,这样直接导致某个列表页面的接口查询耗时达到了5秒左右;

    找到了问题的根源后,尝试使用上面的优化思路进行解决即可,优化后的sql大概如下,

    4、排序(order by)优化

    在mysql,排序主要有两种方式

    • Using filesort : 通过表索引或全表扫描,读取满足条件的数据行,然后在排序缓冲区sort。buffer中完成排序操作,所有不是通过索引直接返回排序结果的排序都叫 FileSort 排序;
    • Using index : 通过有序的索引顺序扫描直接返回有序数据,这种情况即为 using index,不需要额外排序,操作效率高;

    对于以上两种排序方式,Using index的性能高,而Using filesort的性能低,我们在优化排序操作时,尽量要优化为 Using index

    4.1 使用age字段进行排序

    由于age字段未加索引,查询结果按照age排序的时候发现使用了filesort,排序性能较低;

    给age字段添加索引,再次使用order by时就走了索引;

    4.2 使用多字段进行排序

    通常在实际业务中,参与排序的字段往往不只一个,这时候,就可以对参与排序的多个字段创建联合索引;

    如下根据stuno和age排序

    给stuno和age添加联合索引

    create index idx_stuno_age on `student`(stuno,age);

    再次分析时结果如下,此时排序走了索引

    关于多字段排序时的注意事项

    1)排序时,需要满足最左前缀法则,否则也会出现 filesort;

    在上面我们创建的联合索引顺序是stuno和age,即stuno在前面,而age在后,如果查询的时候调换排序顺序会怎样呢?通过分析结果发现,走了filesort;

    2)排序时,排序的类型保持一致

    在保持字段排序顺序不变时,默认情况下,如果都按照升序或者降序时,order by可以使用index,如果一个是升序,另一个是降序会如何呢?分析发现,这种情况下也会走filesort;

    5、分组(group by)优化

    group by 的优化策略和order by 的优化策略非常像,主要列举如下几个要点:

    • group by 即使没有过滤条件用到索引,也可以直接使用索引;
    • group by 先排序再分组,遵照索引建的最佳左前缀法则;
    • 当无法使用索引列时,增大 max_length_for_sort_datasort_buffer_size 参数的设置;
    • where效率高于having,能写在where限定的条件就不要写在having中了;
    • 减少使用order by,能不排序就不排序,或将排序放到程序去做。Order bygroupbydistinct这些语句较为耗费CPU,数据库的CPU资源是极其宝贵的;
    • 如果sql包含了order bygroup bydistinct这些查询的语句,where条件过滤出来的结果集请保持在1000行以内,否则SQL会很慢;
    5.1 给group by的字段添加索引

    如果字段未加索引,分析结果如下,这种结果性能显然很低效

    给stuno添加索引之后

    给stuno和age添加联合索引

    如果不遵循最佳左前缀,group by 性能将会比较低效

    遵循最佳左前缀的情况如下

    6、count 优化

    count() 是一个聚合函数,对于返回的结果集,一行行判断,如果 count 函数的参数不是NULL,累计值就加 1,否则不加,最后返回累计值;

    用法:count(*)count(主键)count(字段)count(数字)

    如下列举了count的几种写法的详细说明

    经验值总结

    按照效率排序来看,count(字段) ,所以尽量使用 count(*)

    作者:逆风飞翔的小叔

    来源:blog.csdn.net/congge_study/article/

    details/127712927

  • 别再分库分表了,试试TiDB!

    快乐分享,Java干货及时送达👇

    来源:www.cnblogs.com/jiagooushi/archive/2023/03/24/17251486.html

    • 什么是NewSQL
    • 传统SQL的问题
      • 升级服务器硬件
      • 数据分片
    • NoSQL 的问题
      • 优点
      • 缺点
    • NewSQL 特性
    • NewSQL 的主要特性
    • 三种SQL的对比
    • TiDB怎么来的
    • TiDB社区版和企业版
    • TIDB核心特性
      • 水平弹性扩展
      • 分布式事务支持
      • 金融级高可用
      • 实时 HTAP
      • 云原生的分布式数据库
      • 高度兼容 MySQL
    • OLTP&OLAP(自学)
      • OLTP(联机事务处理)
      • OLAP(联机分析处理)
      • 特性对比
      • 设计角度区别
    • TiDB 整体架构
    • TiDB的优势
    • TiDB的组件
      • TiDB Server
      • PD(Placement Driver)Server
      • TiKV Server
      • TiSpark
      • TiFlash
    • TiKV整体架构
      • Region分裂与合并
      • Region调度
      • 分布式事务
    • 高可用架构
      • TiDB高可用
      • PD高可用
      • TiKV高可用
    • 应用场景
      • MySQL分片与合并
      • 直接替换MySQL
      • 数据仓库
      • 作为其他系统的模块
    • 应用案例
    • TiDB与MySQL兼容性对比
    • TiDB不支持的MySql特性
    • 自增ID
    • SELECT 的限制
    • 视图
    • 默认设置差异
      • 字符集
      • 排序规则
      • 大小写敏感
      • 参数解释
      • timestamp类型字段更新
      • 参数解释
      • 外键支持

    TiDB 是一个分布式 NewSQL 数据库。它支持水平弹性扩展、ACID 事务、标准 SQL、MySQL 语法和 MySQL 协议,具有数据强一致的高可用特性,是一个不仅适合 OLTP 场景还适合 OLAP 场景的混合数据库。

    TiDB是 PingCAP公司自主设计、研发的开源分布式关系型数据库,是一款同时支持在线事务处理与在线分析处理 (Hybrid Transactional and Analytical Processing, HTAP)的融合型分布式数据库产品,具备水平扩容或者缩容、金融级高可用、实时 HTAP、云原生的分布式数据库、兼容 MySQL 5.7 协议和 MySQL 生态等重要特性。目标是为用户提供一站式 OLTP (Online Transactional Processing)、OLAP (Online Analytical Processing)、HTAP 解决方案。TiDB 适合高可用、强一致要求较高、数据规模较大等各种应用场景。

    什么是NewSQL

    数据库发展至今已经有3代了:

    1. SQL,传统关系型数据库,例如 MySQL
    2. noSQL,例如 MongoDB,Redis
    3. newSQL

    传统SQL的问题

    互联网在本世纪初开始迅速发展,互联网应用的用户规模、数据量都越来越大,并且要求7X24小时在线。

    传统关系型数据库在这种环境下成为了瓶颈,通常有2种解决方法:

    升级服务器硬件

    虽然提升了性能,但总有天花板。

    数据分片

    使用分布式集群结构

    对单点数据库进行数据分片,存放到由廉价机器组成的分布式的集群里,可扩展性更好了,但也带来了新的麻烦。

    以前在一个库里的数据,现在跨了多个库,应用系统不能自己去多个库中操作,需要使用数据库分片中间件。

    分片中间件做简单的数据操作时还好,但涉及到跨库join、跨库事务时就很头疼了,很多人干脆自己在业务层处理,复杂度较高。

    NoSQL 的问题

    后来 noSQL 出现了,放弃了传统SQL的强事务保证和关系模型,重点放在数据库的高可用性和可扩展性。

    优点

    • 高可用性和可扩展性,自动分区,轻松扩展
    • 不保证强一致性,性能大幅提升
    • 没有关系模型的限制,极其灵活

    缺点

    • 不保证强一致性,对于普通应用没问题,但还是有不少像金融一样的企业级应用有强一致性的需求。
    • 不支持 SQL 语句,兼容性是个大问题,不同的 NoSQL 数据库都有自己的 api 操作数据,比较复杂。

    NewSQL 特性

    NewSQL 提供了与 noSQL 相同的可扩展性,而且仍基于关系模型,还保留了极其成熟的 SQL 作为查询语言,保证了ACID事务特性。

    简单来讲,NewSQL 就是在传统关系型数据库上集成了 NoSQL 强大的可扩展性。

    传统的SQL架构设计基因中是没有分布式的,而 NewSQL 生于云时代,天生就是分布式架构。

    NewSQL 的主要特性

    • SQL 支持,支持复杂查询和大数据分析。
    • 支持 ACID 事务,支持隔离级别。
    • 弹性伸缩,扩容缩容对于业务层完全透明。
    • 高可用,自动容灾。

    三种SQL的对比

    图片

    TiDB怎么来的

    著名的开源分布式缓存服务 Codis 的作者,PingCAP联合创始人& CTO ,资深 infrastructure 工程师的黄东旭,擅长分布式存储系统的设计与实现,开源狂热分子的技术大神级别人物。即使在互联网如此繁荣的今天,在数据库这片边界模糊且不确定地带,他还在努力寻找确定性的实践方向。关注公z号:码猿技术专栏,回复关键词:1111 获取阿里内部Java性能优化手册!

    直到 2012 年底,他看到 Google 发布的两篇论文,如同棱镜般,折射出他自己内心微烁的光彩。这两篇论文描述了 Google 内部使用的一个海量关系型数据库 F1/Spanner ,解决了关系型数据库、弹性扩展以及全球分布的问题,并在生产中大规模使用。“如果这个能实现,对数据存储领域来说将是颠覆性的”,黄东旭为完美方案的出现而兴奋, PingCAP 的 TiDB 在此基础上诞生了。

    TiDB社区版和企业版

    TiDB分为社区版以及企业版,企业版收费提供服务以及安全性的支持

    图片

    TIDB核心特性

    水平弹性扩展

    通过简单地增加新节点即可实现 TiDB 的水平扩展,按需扩展吞吐或存储,轻松应对高并发、海量数据场景

    得益于 TiDB 存储计算分离的架构的设计,可按需对计算、存储分别进行在线扩容或者缩容,扩容或者缩容过程中对应用运维人员透明。

    分布式事务支持

    TiDB 100% 支持标准的 ACID 事务

    金融级高可用

    相比于传统主从 (M-S) 复制方案,基于 Raft 的多数派选举协议可以提供金融级的 100% 数据强一致性保证,且在不丢失大多数副本的前提下,可以实现故障的自动恢复 (auto-failover),无需人工介入

    数据采用多副本存储,数据副本通过 Multi-Raft 协议同步事务日志,多数派写入成功事务才能提交,确保数据强一致性且少数副本发生故障时不影响数据的可用性。可按需配置副本地理位置、副本数量等策略满足不同容灾级别的要求。

    实时 HTAP

    TiDB 作为典型的 OLTP 行存数据库,同时兼具强大的 OLAP 性能,配合 TiSpark,可提供一站式 HTAP 解决方案,一份存储同时处理 OLTP & OLAP 无需传统繁琐的 ETL 过程

    提供行存储引擎 TiKV、列存储引擎 TiFlash 两款存储引擎,TiFlash 通过 Multi-Raft Learner 协议实时从 TiKV 复制数据,确保行存储引擎 TiKV 和列存储引擎 TiFlash 之间的数据强一致。TiKV、TiFlash 可按需部署在不同的机器,解决 HTAP 资源隔离的问题。

    云原生的分布式数据库

    TiDB 是为云而设计的数据库,同 Kubernetes 深度耦合,支持公有云、私有云和混合云,使部署、配置和维护变得十分简单。TiDB 的设计目标是 100% 的 OLTP 场景和 80% 的 OLAP 场景,更复杂的 OLAP 分析可以通过 TiSpark 项目来完成。TiDB 对业务没有任何侵入性,能优雅的替换传统的数据库中间件、数据库分库分表等 Sharding 方案。同时它也让开发运维人员不用关注数据库 Scale 的细节问题,专注于业务开发,极大的提升研发的生产力

    高度兼容 MySQL

    兼容 MySQL 5.7 协议、MySQL 常用的功能、MySQL 生态,应用无需或者修改少量代码即可从 MySQL 迁移到 TiDB。

    提供丰富的数据迁移工具帮助应用便捷完成数据迁移,大多数情况下,无需修改代码即可从 MySQL 轻松迁移至 TiDB,分库分表后的 MySQL 集群亦可通过 TiDB 工具进行实时迁移。

    OLTP&OLAP(自学)

    OLTP(联机事务处理)

    OLTP(Online Transactional Processing) 即联机事务处理,OLTP 是传统的关系型数据库的主要应用,主要是基本的、日常的事务处理,记录即时的增、删、改、查,比如在银行存取一笔款,就是一个事务交易

    联机事务处理是事务性非常高的系统,一般都是高可用的在线系统,以小的事务以及小的查询为主,评估其系统的时候,一般看其每秒执行的Transaction以及Execute SQL的数量。在这样的系统中,单个数据库每秒处理的Transaction往往超过几百个,或者是几千个,Select 语句的执行量每秒几千甚至几万个。典型的OLTP系统有电子商务系统、银行、证券等,如美国eBay的业务数据库,就是很典型的OLTP数据库。

    OLAP(联机分析处理)

    OLAP(Online Analytical Processing) 即联机分析处理,是数据仓库的核心部心,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的查询结果。典型的应用就是复杂的动态报表系统

    在这样的系统中,语句的执行量不是考核标准,因为一条语句的执行时间可能会非常长,读取的数据也非常多。所以,在这样的系统中,考核的标准往往是磁盘子系统的吞吐量(带宽),如能达到多少MB/s的流量。

    特性对比

    OLTP和OLAP的特性对比

    OLTP OLAP
    实时性 OLTP 实时性要求高,OLTP 数据库旨在使事务应用程序仅写入所需的数据,以便尽快处理单个事务 OLAP 的实时性要求不是很高,很多应用顶多是每天更新一下数据
    数据量 OLTP 数据量不是很大,一般只读 / 写数十条记录,处理简单的事务 OLAP 数据量大,因为 OLAP 支持的是动态查询,所以用户也许要通过将很多数据的统计后才能得到想要知道的信息,例如时间序列分析等等,所以处理的数据量很大
    用户和系统的面向性 OLTP 是面向顾客的,用于事务和查询处理 OLAP 是面向市场的,用于数据分析
    数据库设计 OLTP 采用实体 – 联系 ER 模型和面向应用的数据库设计 OLAP 采用星型或雪花模型和面向主题的数据库设计

    设计角度区别

    OLTP OLAP
    用户 操作人员,低层管理人员 决策人员,高级管理人员
    功能 日常操作处理 分析决策
    主要工作 增、删、改 查询
    DB 设计 面向应用 面向主题
    数据 当前的,最新的细节,二维的,分立的 历史的,聚集的,多维集成的,统一的
    存取 读/写数十条记录 读上百万条记录
    工作单位 简单的事务 复杂的查询
    用户数 上千个 上百个
    DB 大小 100MB-GB 100GB-TB

    TiDB 整体架构

    TiDB的优势

    与传统的单机数据库相比,TiDB 具有以下优势:

    • 纯分布式架构,拥有良好的扩展性,支持弹性的扩缩容
    • 支持 SQL,对外暴露 MySQL 的网络协议,并兼容大多数 MySQL 的语法,在大多数场景下可以直接替换 MySQL
    • 默认支持高可用,在少数副本失效的情况下,数据库本身能够自动进行数据修复和故障转移,对业务透明
    • 支持 ACID 事务,对于一些有强一致需求的场景友好,例如:银行转账
    • 具有丰富的工具链生态,覆盖数据迁移、同步、备份等多种场景

    TiDB的组件

    要深入了解 TiDB 的水平扩展和高可用特点,首先需要了解 TiDB 的整体架构。TiDB 集群主要包括三个核心组件:TiDB Server,PD Server 和 TiKV Server,此外,还有用于解决用户复杂 OLAP 需求的 TiSpark 组件。关注公z号:码猿技术专栏,回复关键词:1111 获取阿里内部Java性能优化手册!

    在内核设计上,TiDB 分布式数据库将整体架构拆分成了多个模块,各模块之间互相通信,组成完整的 TiDB 系统。对应的架构图如下:

    图片

    architecture

    TiDB Server

    TiDB Server 负责接收 SQL 请求,处理 SQL 相关的逻辑,并通过 PD 找到存储计算所需数据的 TiKV 地址,与 TiKV 交互获取数据,最终返回结果。TiDB Server 是无状态的,其本身并不存储数据,只负责计算,可以无限水平扩展,可以通过负载均衡组件(如 LVS、HAProxy 或 F5)对外提供统一的接入地址。

    PD (Placement Driver) Server

    Placement Driver (简称 PD) 是整个集群的管理模块,其主要工作有三个:

    • 一是存储集群的元信息(某个 Key 存储在哪个 TiKV 节点);
    • 二是对 TiKV 集群进行调度和负载均衡(如数据的迁移、Raft group leader 的迁移等);
    • 三是分配全局唯一且递增的事务 ID。

    PD 通过 Raft 协议保证数据的安全性。Raft 的 leader server 负责处理所有操作,其余的 PD server 仅用于保证高可用。建议部署奇数个 PD 节点

    TiKV Server

    TiKV Server 负责存储数据,从外部看 TiKV 是一个分布式的提供事务的 Key-Value 存储引擎。存储数据的基本单位是 Region,每个 Region 负责存储一个 Key Range(从 StartKey 到 EndKey 的左闭右开区间)的数据,每个 TiKV 节点会负责多个 Region。TiKV 使用 Raft 协议做复制,保持数据的一致性和容灾。副本以 Region 为单位进行管理,不同节点上的多个 Region 构成一个 Raft Group,互为副本。数据在多个 TiKV 之间的负载均衡由 PD 调度,这里也是以 Region 为单位进行调度。

    TiSpark

    TiSpark 作为 TiDB 中解决用户复杂 OLAP 需求的主要组件,将 Spark SQL 直接运行在 TiDB 存储层上,同时融合 TiKV 分布式集群的优势,并融入大数据社区生态。至此,TiDB 可以通过一套系统,同时支持 OLTP 与 OLAP,免除用户数据同步的烦恼。

    TiFlash

    TiFlash 是一类特殊的存储节点。和普通 TiKV 节点不一样的是,在 TiFlash 内部,数据是以列式的形式进行存储,主要的功能是为分析型的场景加速。

    TiKV整体架构

    与传统的整节点备份方式不同的,TiKV是将数据按照 key 的范围划分成大致相等的切片(下文统称为 Region),每一个切片会有多个副本(通常是 3 个),其中一个副本是 Leader,提供读写服务。TiKV 通过 PD 对这些 Region 以及副本进行调度,以保证数据和读写负载都均匀地分散在各个 TiKV 上,这样的设计保证了整个集群资源的充分利用并且可以随着机器数量的增加水平扩展。

    图片

    Region分裂与合并

    当某个 Region 的大小超过一定限制(默认是 144MB)后,TiKV 会将它分裂为两个或者更多个 Region,以保证各个 Region 的大小是大致接近的,这样更有利于 PD 进行调度决策。同样,当某个 Region 因为大量的删除请求导致 Region 的大小变得更小时,TiKV 会将比较小的两个相邻 Region 合并为一个。

    Region调度

    Region 与副本之间通过 Raft 协议来维持数据一致性,任何写请求都只能在 Leader 上写入,并且需要写入多数副本后(默认配置为 3 副本,即所有请求必须至少写入两个副本成功)才会返回客户端写入成功。

    当 PD 需要把某个 Region 的一个副本从一个 TiKV 节点调度到另一个上面时,PD 会先为这个 Raft Group 在目标节点上增加一个 Learner 副本(复制 Leader 的数据)。当这个 Learner 副本的进度大致追上 Leader 副本时,Leader 会将它变更为 Follower,之后再移除操作节点的 Follower 副本,这样就完成了 Region 副本的一次调度。

    Leader 副本的调度原理也类似,不过需要在目标节点的 Learner 副本变为 Follower 副本后,再执行一次 Leader Transfer,让该 Follower 主动发起一次选举成为新 Leader,之后新 Leader 负责删除旧 Leader 这个副本。

    分布式事务

    TiKV 支持分布式事务,用户(或者 TiDB)可以一次性写入多个 key-value 而不必关心这些 key-value 是否处于同一个数据切片 (Region) 上,TiKV 通过两阶段提交保证了这些读写请求的 ACID 约束。

    高可用架构

    高可用是 TiDB 的另一大特点,TiDB/TiKV/PD 这三个组件都能容忍部分实例失效,不影响整个集群的可用性。下面分别说明这三个组件的可用性、单个实例失效后的后果以及如何恢复。

    TiDB高可用

    TiDB 是无状态的,推荐至少部署两个实例,前端通过负载均衡组件对外提供服务。当单个实例失效时,会影响正在这个实例上进行的 Session,从应用的角度看,会出现单次请求失败的情况,重新连接后即可继续获得服务。单个实例失效后,可以重启这个实例或者部署一个新的实例。

    PD高可用

    PD 是一个集群,通过 Raft 协议保持数据的一致性,单个实例失效时,如果这个实例不是 Raft 的 leader,那么服务完全不受影响;如果这个实例是 Raft 的 leader,会重新选出新的 Raft leader,自动恢复服务。PD 在选举的过程中无法对外提供服务,这个时间大约是3秒钟。推荐至少部署三个 PD 实例,单个实例失效后,重启这个实例或者添加新的实例。

    TiKV高可用

    TiKV 是一个集群,通过 Raft 协议保持数据的一致性(副本数量可配置,默认保存三副本),并通过 PD 做负载均衡调度。单个节点失效时,会影响这个节点上存储的所有 Region。对于 Region 中的 Leader 结点,会中断服务,等待重新选举;对于 Region 中的 Follower 节点,不会影响服务。当某个 TiKV 节点失效,并且在一段时间内(默认 10 分钟)无法恢复,PD 会将其上的数据迁移到其他的 TiKV 节点上。

    应用场景

    MySQL分片与合并

    图片

    TiDB 应用的第一类场景是 MySQL 的分片与合并。对于已经在用 MySQL 的业务,分库、分表、分片、中间件是常用手段,随着分片的增多,跨分片查询是一大难题。TiDB 在业务层兼容 MySQL 的访问协议,PingCAP 做了一个数据同步的工具——Syncer,它可以把黄东旭 TiDB 作为一个 MySQL Slave,将 TiDB 作为现有数据库的从库接在主 MySQL 库的后方,在这一层将数据打通,可以直接进行复杂的跨库、跨表、跨业务的实时 SQL 查询。黄东旭提到,“过去的数据库都是一主多从,有了 TiDB 以后,可以反过来做到多主一从。”

    直接替换MySQL

    图片

    第二类场景是用 TiDB 直接去替换 MySQL。如果你的IT架构在搭建之初并未考虑分库分表的问题,全部用了 MySQL,随着业务的快速增长,海量高并发的 OLTP 场景越来越多,如何解决架构上的弊端呢?

    在一个 TiDB 的数据库上,所有业务场景不需要做分库分表,所有的分布式工作都由数据库层完成。TiDB 兼容 MySQL 协议,所以可以直接替换 MySQL,而且基本做到了开箱即用,完全不用担心传统分库分表方案带来繁重的工作负担和复杂的维护成本,友好的用户界面让常规的技术人员可以高效地进行维护和管理。另外,TiDB 具有 NoSQL 类似的扩容能力,在数据量和访问流量持续增长的情况下能够通过水平扩容提高系统的业务支撑能力,并且响应延迟稳定。

    数据仓库

    图片

    TiDB 本身是一个分布式系统,第三种使用场景是将 TiDB 当作数据仓库使用。TPC-H 是数据分析领域的一个测试集,TiDB 2.0 在 OLAP 场景下的性能有了大幅提升,原来只能在数据仓库里面跑的一些复杂的 Query,在 TiDB 2.0 里面跑,时间基本都能控制在 10 秒以内。当然,因为 OLAP 的范畴非常大,TiDB 的 SQL 也有搞不定的情况,为此 PingCAP 开源了 TiSpark,TiSpark 是一个 Spark 插件,用户可以直接用 Spark SQL 实时地在 TiKV 上做大数据分析。

    作为其他系统的模块

    图片

    TiDB 是一个传统的存储跟计算分离的项目,其底层的 Key-Value 层,可以单独作为一个 HBase 的 Replacement 来用,它同时支持跨行事务。TiDB 对外提供两个 API 接口,一个是 ACID Transaction 的 API,用于支持跨行事务;另一个是 Raw API,它可以做单行的事务,换来的是整个性能的提升,但不提供跨行事务的 ACID 支持。用户可以根据自身的需求在两个 API 之间自行选择。例如有一些用户直接在 TiKV 之上实现了 Redis 协议,将 TiKV 替换一些大容量,对延迟要求不高的 Redis 场景。

    应用案例

    图片

    TiDB与MySQL兼容性对比

    • TiDB支持MySQL 传输协议及其绝大多数的语法。这意味着您现有的MySQL连接器和客户端都可以继续使用。大多数情况下您现有的应用都可以迁移至 TiDB,无需任何代码修改。
    • 当前TiDB服务器官方支持的版本为MySQL 5.7 。大部分MySQL运维工具(如PHPMyAdmin, Navicat, MySQL Workbench等),以及备份恢复工具(如 mysqldump, Mydumper/myloader)等都可以直接使用。
    • 不过一些特性由于在分布式环境下没法很好的实现,目前暂时不支持或者是表现与MySQL有差异
    • 一些MySQL语法在TiDB中可以解析通过,但是不会做任何后续的处理 ,例如Create Table语句中Engine,是解析并忽略。

    TiDB不支持的MySql特性

    • 存储过程与函数
    • 触发器
    • 事件
    • 自定义函数
    • 外键约束
    • 临时表
    • 全文/空间函数与索引
    • ascii/latin1/binary/utf8/utf8mb4 的字符集
    • SYS schema
    • MySQL 追踪优化器
    • XML 函数
    • X-Protocol
    • Savepoints
    • 列级权限
    • XA 语法(TiDB 内部使用两阶段提交,但并没有通过 SQL 接口公开)
    • CREATE TABLE tblName AS SELECT stmt 语法
    • CHECK TABLE 语法
    • CHECKSUM TABLE 语法
    • GET_LOCKRELEASE_LOCK 函数

    自增ID

    TiDB 的自增列仅保证唯一,也能保证在单个 TiDB server 中自增,但不保证多个 TiDB server 中自增,不保证自动分配的值的连续性,建议不要将缺省值和自定义值混用,若混用可能会收 Duplicated Error 的错误信息。

    TiDB 可通过 tidb_allow_remove_auto_inc 系统变量开启或者关闭允许移除列的 AUTO_INCREMENT 属性。删除列属性的语法是:alter table modifyalter table change

    TiDB 不支持添加列的 AUTO_INCREMENT 属性,移除该属性后不可恢复。

    SELECT 的限制

    • 不支持 SELECT ... INTO @变量 语法。
    • 不支持 SELECT ... GROUP BY ... WITH ROLLUP 语法。
    • TiDB 中的 SELECT .. GROUP BY expr 的返回结果与 MySQL 5.7 并不一致。MySQL 5.7 的结果等价于 GROUP BY expr ORDER BY expr。而 TiDB 中该语法所返回的结果并不承诺任何顺序,与 MySQL 8.0 的行为一致。

    视图

    目前TiDB不支持 对视图进行UPDATE、INSERT、DELETE等写入操作

    默认设置差异

    字符集

    • TiDB 默认:utf8mb4
    • MySQL 5.7 默认:latin1
    • MySQL 8.0 默认:utf8mb4

    排序规则

    • TiDB 中 utf8mb4 字符集默认:utf8mb4_bin
    • MySQL 5.7 中 utf8mb4 字符集默认:utf8mb4_general_ci
    • MySQL 8.0 中 utf8mb4 字符集默认:utf8mb4_unicode_520_ci

    大小写敏感

    关于lower_case_table_names的配置

    • TiDB 默认:2,且仅支持设置该值为 2

    • MySQL 默认如下:

      • Linux 系统中该值为 0
    • Windows 系统中该值为 1

    • macOS 系统中该值为 2

    参数解释

    • lower_case_table_names=0 表名存储为给定的大小和比较是区分大小写的
    • lower_case_table_names = 1 表名存储在磁盘是小写的,但是比较的时候是不区分大小写
    • lower_case_table_names=2 表名存储为给定的大小写但是比较的时候是小写的

    timestamp类型字段更新

    默认情况下,timestamp类型字段所在数据行被更新时,该字段会自动更新为当前时间,而参数explicit_defaults_for_timestamp控制这一种行为。

    • TiDB 默认:ON,且仅支持设置该值为 ON
    • MySQL 5.7 默认:OFF
    • MySQL 8.0 默认:ON

    参数解释

    • explicit_defaults_for_timestamp=off,数据行更新时,timestamp类型字段更新为当前时间
    • explicit_defaults_for_timestamp=on,数据行更新时,timestamp类型字段不更新为当前时间。

    外键支持

    • TiDB 默认:OFF,且仅支持设置该值为 OFF
    • MySQL 5.7 默认:ON

  • SpringBoot整合Canal+RabbitMQ监听数据变更~

    快乐分享,Java干货及时送达👇

    来源:JAVA日知录

    • 需求
    • 步骤
    • 环境搭建
    • 整合SpringBoot Canal实现客户端
    • Canal整合RabbitMQ
    • SpringBoot整合RabbitMQ

    需求

    我想要在SpringBoot中采用一种与业务代码解耦合的方式,来实现数据的变更记录,记录的内容是新数据,如果是更新操作还得有旧数据内容。

    经过调研发现,使用Canal来监听MySQL的binlog变化可以实现这个需求,可是在监听到变化后需要马上保存变更记录,除非再做一些逻辑处理,于是我又结合了RabbitMQ来处理保存变更记录的操作。

    步骤

    • 启动MySQL环境,并开启binlog
    • 启动Canal环境,为其创建一个MySQL账号,然后以Slave的形式连接MySQL
    • Canal服务模式设为TCP,用Java编写客户端代码,监听MySQL的binlog修改
    • Canal服务模式设为RabbitMQ,启动RabbitMQ环境,配置Canal和RabbitMQ的连接,用消息队列去接收binlog修改事件

    环境搭建

    环境搭建基于docker-compose:

    version: "3"  
    services:  
        mysql:  
            network_mode: mynetwork  
            container_name: mymysql  
            ports:  
                - 3306:3306  
            restart: always  
            volumes:  
                - /etc/localtime:/etc/localtime  
                - /home/mycontainers/mymysql/data:/data  
                - /home/mycontainers/mymysql/mysql:/var/lib/mysql  
                - /home/mycontainers/mymysql/conf:/etc/mysql  
            environment:  
                - MYSQL_ROOT_PASSWORD=root  
            command:   
                --character-set-server=utf8mb4  
                --collation-server=utf8mb4_unicode_ci  
                --log-bin=/var/lib/mysql/mysql-bin  
                --server-id=1  
                --binlog-format=ROW  
                --expire_logs_days=7  
                --max_binlog_size=500M  
            image: mysql:5.7.20  
        rabbitmq:     
            container_name: myrabbit  
            ports:  
                - 15672:15672  
                - 5672:5672  
            restart: always  
            volumes:  
                - /etc/localtime:/etc/localtime  
                - /home/mycontainers/myrabbit/rabbitmq:/var/lib/rabbitmq  
            network_mode: mynetwork  
            environment:  
                - RABBITMQ_DEFAULT_USER=admin  
                - RABBITMQ_DEFAULT_PASS=123456  
            image: rabbitmq:3.8-management  
        canal-server:  
            container_name: canal-server  
            restart: always  
            ports:  
                - 11110:11110  
                - 11111:11111  
                - 11112:11112  
            volumes:  
                - /home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties  
                - /home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties  
                - /home/mycontainers/canal-server/logs:/home/admin/canal-server/logs  
            network_mode: mynetwork  
            depends_on:  
                - mysql  
                - rabbitmq  
                # - canal-admin  
            image: canal/canal-server:v1.1.5  

    我们需要修改下Canal环境的配置文件:canal.propertiesinstance.properties,映射Canal中的以下两个路径:

    • /home/admin/canal-server/conf/canal.properties

    配置文件中,canal.destinations意思是server上部署的instance列表,

    • /home/admin/canal-server/conf/example/instance.properties

    这里的/example是指instance即实例名,要和上面canal.properties内instance配置对应,canal会为实例创建对应的文件夹,一个Client对应一个实例

    以下是我们需要准备的两个配置文件具体内容:

    canal.properties

    ################################################  
    ########     common argument   ############  
    ################################################  
    # tcp bind ip  
    canal.ip =  
    # register ip to zookeeper  
    canal.register.ip =  
    canal.port = 11111  
    canal.metrics.pull.port = 11112  
    # canal instance user/passwd  
    # canal.user = canal  
    # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458  
      
    # canal admin config  
    # canal.admin.manager = canal-admin:8089  
      
    # canal.admin.port = 11110  
    # canal.admin.user = admin  
    # canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9  
      
    # admin auto register 自动注册  
    # canal.admin.register.auto = true  
    # 集群名,单机则不写  
    # canal.admin.register.cluster =  
    # Canal Server 名字  
    # canal.admin.register.name = canal-admin  
      
    canal.zkServers =  
    # flush data to zk  
    canal.zookeeper.flush.period = 1000  
    canal.withoutNetty = false  
    # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ  
    canal.serverMode = tcp  
    # flush meta cursor/parse position to file  
    canal.file.data.dir = ${canal.conf.dir}  
    canal.file.flush.period = 1000  
    # memory store RingBuffer size, should be Math.pow(2,n)  
    canal.instance.memory.buffer.size = 16384  
    # memory store RingBuffer used memory unit size , default 1kb  
    canal.instance.memory.buffer.memunit = 1024   
    # meory store gets mode used MEMSIZE or ITEMSIZE  
    canal.instance.memory.batch.mode = MEMSIZE  
    canal.instance.memory.rawEntry = true  
      
    # detecing config  
    canal.instance.detecting.enable = false  
    #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()  
    canal.instance.detecting.sql = select 1  
    canal.instance.detecting.interval.time = 3  
    canal.instance.detecting.retry.threshold = 3  
    canal.instance.detecting.heartbeatHaEnable = false  
      
    # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery  
    canal.instance.transaction.size =  1024  
    # mysql fallback connected to new master should fallback times  
    canal.instance.fallbackIntervalInSeconds = 60  
      
    # network config  
    canal.instance.network.receiveBufferSize = 16384  
    canal.instance.network.sendBufferSize = 16384  
    canal.instance.network.soTimeout = 30  
      
    # binlog filter config  
    canal.instance.filter.druid.ddl = true  
    canal.instance.filter.query.dcl = false  
    canal.instance.filter.query.dml = false  
    canal.instance.filter.query.ddl = false  
    canal.instance.filter.table.error = false  
    canal.instance.filter.rows = false  
    canal.instance.filter.transaction.entry = false  
    canal.instance.filter.dml.insert = false  
    canal.instance.filter.dml.update = false  
    canal.instance.filter.dml.delete = false  
      
    # binlog format/image check  
    canal.instance.binlog.format = ROW,STATEMENT,MIXED   
    canal.instance.binlog.image = FULL,MINIMAL,NOBLOB  
      
    # binlog ddl isolation  
    canal.instance.get.ddl.isolation = false  
      
    # parallel parser config  
    canal.instance.parser.parallel = true  
    # concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()  
    canal.instance.parser.parallelThreadSize = 16  
    # disruptor ringbuffer size, must be power of 2  
    canal.instance.parser.parallelBufferSize = 256  
      
    # table meta tsdb info  
    canal.instance.tsdb.enable = true  
    canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}  
    canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;  
    canal.instance.tsdb.dbUsername = canal  
    canal.instance.tsdb.dbPassword = canal  
    # dump snapshot interval, default 24 hour  
    canal.instance.tsdb.snapshot.interval = 24  
    # purge snapshot expire , default 360 hour(15 days)  
    canal.instance.tsdb.snapshot.expire = 360  
      
    ################################################  
    ########     destinations    ############  
    ################################################  
    canal.destinations = canal-exchange  
    # conf root dir  
    canal.conf.dir = ../conf  
    # auto scan instance dir add/remove and start/stop instance  
    canal.auto.scan = true  
    canal.auto.scan.interval = 5  
    # set this value to 'true' means that when binlog pos not found, skip to latest.  
    # WARN: pls keep 'false' in production env, or if you know what you want.  
    canal.auto.reset.latest.pos.mode = false  
      
    canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml  
    #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml  
      
    canal.instance.global.mode = spring  
    canal.instance.global.lazy = false  
    canal.instance.global.manager.address = ${canal.admin.manager}  
    #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml  
    canal.instance.global.spring.xml = classpath:spring/file-instance.xml  
    #canal.instance.global.spring.xml = classpath:spring/default-instance.xml  
      
    #################################################  
    ########         MQ Properties      ############  
    #################################################  
    # aliyun ak/sk , support rds/mq  
    canal.aliyun.accessKey =  
    canal.aliyun.secretKey =  
    canal.aliyun.uid=  
      
    canal.mq.flatMessage = true  
    canal.mq.canalBatchSize = 50  
    canal.mq.canalGetTimeout = 100  
    # Set this value to "cloud", if you want open message trace feature in aliyun.  
    canal.mq.accessChannel = local  
      
    canal.mq.database.hash = true  
    canal.mq.send.thread.size = 30  
    canal.mq.build.thread.size = 8  
      
    #################################################  
    ########         RabbitMQ       ############  
    #################################################  
    rabbitmq.host = myrabbit  
    rabbitmq.virtual.host = /  
    rabbitmq.exchange = canal-exchange  
    rabbitmq.username = admin  
    rabbitmq.password = RabbitMQ密码  
    rabbitmq.deliveryMode =  

    此时canal.serverMode = tcp,即TCP直连,我们先开启这个服务,然后手写Java客户端代码去连接它,等下再改为RabbitMQ。

    通过注释可以看到,canal支持的服务模式有:tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ,即主流的消息队列都支持。

    instance.properties

    ################################################  
    # mysql serverId , v1.0.26+ will autoGen  
    #canal.instance.mysql.slaveId=123  
      
    # enable gtid use true/false  
    canal.instance.gtidon=false  
      
    # position info  
    canal.instance.master.address=mymysql:3306  
    canal.instance.master.journal.name=  
    canal.instance.master.position=  
    canal.instance.master.timestamp=  
    canal.instance.master.gtid=  
      
    # rds oss binlog  
    canal.instance.rds.accesskey=  
    canal.instance.rds.secretkey=  
    canal.instance.rds.instanceId=  
      
    # table meta tsdb info  
    canal.instance.tsdb.enable=true  
    #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb  
    #canal.instance.tsdb.dbUsername=canal  
    #canal.instance.tsdb.dbPassword=canal  
      
    #canal.instance.standby.address =  
    #canal.instance.standby.journal.name =  
    #canal.instance.standby.position =  
    #canal.instance.standby.timestamp =  
    #canal.instance.standby.gtid=  
      
    # username/password  
    canal.instance.dbUsername=canal  
    canal.instance.dbPassword=canal  
    canal.instance.connectionCharset = UTF-8  
    # enable druid Decrypt database password  
    canal.instance.enableDruid=false  
    #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==  
      
    # table regex  
    canal.instance.filter.regex=.*..*  
    # table black regex  
    canal.instance.filter.black.regex=mysql.slave_.*  
    # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)  
    #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch  
    # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)  
    #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch  
      
    # mq config  
    canal.mq.topic=canal-routing-key  
    # dynamic topic route by schema or table regex  
    #canal.mq.dynamicTopic=mytest1.user,topic2:mytest2..*,.*..*  
    canal.mq.partition=0  

    把这两个配置文件映射好,再次提醒,注意实例的路径名,默认是:/example/instance.properties

    修改canal配置文件

    我们需要修改这个实例配置文件,去连接MySQL,确保以下的配置正确:

    canal.instance.master.address=mymysql:3306  
    canal.instance.dbUsername=canal  
    canal.instance.dbPassword=canal  

    mymysql是同为docker容器的MySQL环境,端口3306是指内部端口。

    这里多说明一下,docker端口配置时假设为:13306:3306,那么容器对外的端口就是13306,内部是3306,在本示例中,MySQL和Canal都是容器环境,所以Canal连接MySQL需要满足以下条件:

    • 处于同一网段(docker-compose.yml中的mynetwork)
    • 访问内部端口(即3306,而非13306)

    dbUsername和dbPassword为MySQL账号密码,为了开发方便可以使用root/root,但是我仍建议自行创建用户并分配访问权限:

    # 进入docker中的mysql容器  
    docker exec -it mymysql bash  
    # 进入mysql指令模式  
    mysql -uroot -proot  
      
    # 编写MySQL语句并执行  
    > ...  
    -- 选择mysql  
    use mysql;  
    -- 创建canal用户,账密:canal/canal  
    create user 'canal'@'%' identified by 'canal';  
    -- 分配权限,以及允许所有主机登录该用户  
    grant SELECT, INSERT, UPDATE, DELETE, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%';  
      
    -- 刷新一下使其生效  
    flush privileges;  
      
    -- 附带一个删除用户指令  
    drop user 'canal'@'%';  

    用navicat或者shell去登录canal这个用户,可以访问即创建成功

    整合SpringBoot Canal实现客户端

    Maven依赖:

    1.1.5  
      
      
      
      com.alibaba.otter  
      canal.client  
      ${canal.version}  
      
      
      com.alibaba.otter  
      canal.protocol  
      ${canal.version}  
       

    新增组件并启动:

    import com.alibaba.otter.canal.client.CanalConnector;  
    import com.alibaba.otter.canal.client.CanalConnectors;  
    import com.alibaba.otter.canal.protocol.CanalEntry;  
    import com.alibaba.otter.canal.protocol.Message;  
    import org.springframework.boot.CommandLineRunner;  
    import org.springframework.stereotype.Component;  
      
    import java.net.InetSocketAddress;  
    import java.util.List;  
      
    @Component  
    public class CanalClient {  
      
        private final static int BATCH_SIZE = 1000;  
      
        public void run() {  
            // 创建链接  
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "canal-exchange""canal""canal");  
            try {  
                //打开连接  
                connector.connect();  
                //订阅数据库表,全部表  
                connector.subscribe(".*..*");  
                //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿  
                connector.rollback();  
                while (true) {  
                    // 获取指定数量的数据  
                    Message message = connector.getWithoutAck(BATCH_SIZE);  
                    //获取批量ID  
                    long batchId = message.getId();  
                    //获取批量的数量  
                    int size = message.getEntries().size();  
                    //如果没有数据  
                    if (batchId == -1 || size == 0) {  
                        try {  
                            //线程休眠2秒  
                            Thread.sleep(2000);  
                        } catch (InterruptedException e) {  
                            e.printStackTrace();  
                        }  
                    } else {  
                        //如果有数据,处理数据  
                        printEntry(message.getEntries());  
                    }  
                    //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。  
                    connector.ack(batchId);  
                }  
            } catch (Exception e) {  
                e.printStackTrace();  
            } finally {  
                connector.disconnect();  
            }  
        }  
      
        /**  
         * 打印canal server解析binlog获得的实体类信息  
         */  
        private static void printEntry(List entrys) {  
            for (CanalEntry.Entry entry : entrys) {  
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {  
                    //开启/关闭事务的实体类型,跳过  
                    continue;  
                }  
                //RowChange对象,包含了一行数据变化的所有特征  
                //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等  
                CanalEntry.RowChange rowChage;  
                try {  
                    rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());  
                } catch (Exception e) {  
                    throw new RuntimeException("ERROR # parser of eromanga-event has an error , data:" + entry.toString(), e);  
                }  
                //获取操作类型:insert/update/delete类型  
                CanalEntry.EventType eventType = rowChage.getEventType();  
                //打印Header信息  
                System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",  
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  
                        eventType));  
                //判断是否是DDL语句  
                if (rowChage.getIsDdl()) {  
                    System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());  
                }  
                //获取RowChange对象里的每一行数据,打印出来  
                for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {  
                    //如果是删除语句  
                    if (eventType == CanalEntry.EventType.DELETE) {  
                        printColumn(rowData.getBeforeColumnsList());  
                        //如果是新增语句  
                    } else if (eventType == CanalEntry.EventType.INSERT) {  
                        printColumn(rowData.getAfterColumnsList());  
                        //如果是更新的语句  
                    } else {  
                        //变更前的数据  
                        System.out.println("------->; before");  
                        printColumn(rowData.getBeforeColumnsList());  
                        //变更后的数据  
                        System.out.println("------->; after");  
                        printColumn(rowData.getAfterColumnsList());  
                    }  
                }  
            }  
        }  
      
        private static void printColumn(List columns) {  
            for (CanalEntry.Column column : columns) {  
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());  
            }  
        }  
    }  

    启动类Application:

    @SpringBootApplication  
    public class BaseApplication implements CommandLineRunner {  
        @Autowired  
        private CanalClient canalClient;  
      
        @Override  
        public void run(String... args) throws Exception {  
            canalClient.run();  
        }  
    }  

    启动程序,此时新增或修改数据库中的数据,我们就能从客户端中监听到

    不过我建议监听的信息放到消息队列中,在空闲的时候去处理,所以直接配置Canal整合RabbitMQ更好。

    Canal整合RabbitMQ

    修改canal.properties中的serverMode:

    canal.serverMode = rabbitMQ  

    修改instance.properties中的topic:

    canal.mq.topic=canal-routing-key  

    然后找到关于RabbitMQ的配置:

    #################################################  
    ########         RabbitMQ       ############  
    #################################################  
    # 连接rabbit,写IP,因为同个网络下,所以可以写容器名  
    rabbitmq.host = myrabbit  
    rabbitmq.virtual.host = /  
    # 交换器名称,等等我们要去手动创建  
    rabbitmq.exchange = canal-exchange  
    # 账密  
    rabbitmq.username = admin  
    rabbitmq.password = 123456  
    # 暂不支持指定端口,使用的是默认的5762,好在在本示例中适用  

    重新启动容器,进入RabbitMQ管理页面创建exchange交换器和队列queue:

    • 新建exchange,命名为:canal-exchange
    • 新建queue,命名为:canal-queue
    • 绑定exchange和queue,routing-key设置为:canal-routing-key,这里对应上面instance.propertiescanal.mq.topic

    顺带一提,上面这段可以忽略,因为在SpringBoot的RabbitMQ配置中,会自动创建交换器exchange和队列queue,不过手动创建的话,可以在忽略SpringBoot的基础上,直接在RabbitMQ的管理页面上看到修改记录的消息。

    SpringBoot整合RabbitMQ

    依赖:

    2.3.4.RELEASE  
      
      
      
      org.springframework.boot  
      spring-boot-starter-amqp  
      ${amqp.version}  
      

    application.yml

    spring:  
      rabbitmq:  
        #    host: myserverhost  
        host: 192.168.0.108  
        port: 5672  
        username: admin  
        password: RabbitMQ密码  
        # 消息确认配置项  
        # 确认消息已发送到交换机(Exchange)  
        publisher-confirm-type: correlated  
        # 确认消息已发送到队列(Queue)  
        publisher-returns: true  

    RabbitMQ配置类:

    @Configuration  
    public class RabbitConfig {  
        @Bean  
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {  
            RabbitTemplate template = new RabbitTemplate();  
            template.setConnectionFactory(connectionFactory);  
            template.setMessageConverter(new Jackson2JsonMessageConverter());  
      
            return template;  
        }  
      
        /**  
         * template.setMessageConverter(new Jackson2JsonMessageConverter());  
         * 这段和上面这行代码解决RabbitListener循环报错的问题  
         */  
        @Bean  
        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {  
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();  
            factory.setConnectionFactory(connectionFactory);  
            factory.setMessageConverter(new Jackson2JsonMessageConverter());  
            return factory;  
        }  
    }  

    Canal消息生产者:

    public static final String CanalQueue = "canal-queue";  
    public static final String CanalExchange = "canal-exchange";  
    public static final String CanalRouting = "canal-routing-key";  

    /**  
     * Canal消息提供者,canal-server生产的消息通过RabbitMQ消息队列发送  
     */  
    @Configuration  
    public class CanalProvider {  
        /**  
         * 队列  
         */  
        @Bean  
        public Queue canalQueue() {  
            /**  
             * durable:是否持久化,默认false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在;暂存队列:当前连接有效  
             * exclusive:默认为false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable  
             * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除  
             */  
            return new Queue(RabbitConstant.CanalQueue, true);  
        }  
      
        /**  
         * 交换机,这里使用直连交换机  
         */  
        @Bean  
        DirectExchange canalExchange() {  
            return new DirectExchange(RabbitConstant.CanalExchange, truefalse);  
        }  
      
        /**  
         * 绑定交换机和队列,并设置匹配键  
         */  
        @Bean  
        Binding bindingCanal() {  
            return BindingBuilder.bind(canalQueue()).to(canalExchange()).with(RabbitConstant.CanalRouting);  
        }  
    }  

    Canal消息消费者:

    /**  
     * Canal消息消费者  
     */  
    @Component  
    @RabbitListener(queues = RabbitConstant.CanalQueue)  
    public class CanalComsumer {  
        private final SysBackupService sysBackupService;  
      
        public CanalComsumer(SysBackupService sysBackupService) {  
            this.sysBackupService = sysBackupService;  
        }  
      
        @RabbitHandler  
        public void process(Map msg) {  
            System.out.println("收到canal消息:" + msg);  
            boolean isDdl = (boolean) msg.get("isDdl");  
      
            // 不处理DDL事件  
            if (isDdl) {  
                return;  
            }  
      
            // TiCDC的id,应该具有唯一性,先保存再说  
            int tid = (int) msg.get("id");  
            // TiCDC生成该消息的时间戳,13位毫秒级  
            long ts = (long) msg.get("ts");  
            // 数据库  
            String database = (String) msg.get("database");  
            // 表  
            String table = (String) msg.get("table");  
            // 类型:INSERT/UPDATE/DELETE  
            String type = (String) msg.get("type");  
            // 每一列的数据值  
            List> data = (List>) msg.get("data");  
            // 仅当type为UPDATE时才有值,记录每一列的名字和UPDATE之前的数据值  
            List> old = (List>) msg.get("old");  
      
            // 跳过sys_backup,防止无限循环  
            if ("sys_backup".equalsIgnoreCase(table)) {  
                return;  
            }  
      
            // 只处理指定类型  
            if (!"INSERT".equalsIgnoreCase(type)  
                    && !"UPDATE".equalsIgnoreCase(type)  
                    && !"DELETE".equalsIgnoreCase(type)) {  
                return;  
            }  
        }  
    }  

    测试一下,修改MySQL中的一条消息,Canal就会发送信息到RabbitMQ,我们就能从监听的RabbitMQ队列中得到该条消息。

  • 优雅的接口防刷处理方案

    快乐分享,Java干货及时送达👇

    来源:juejin.cn/post/7200366809407750181

    前言

    本文为描述通过Interceptor以及Redis实现接口访问防刷Demo

    这里会通过逐步找问题,逐步去完善的形式展示

    原理

    • 通过ip地址+uri拼接用以作为访问者访问接口区分
    • 通过在Interceptor中拦截请求,从Redis中统计用户访问接口次数从而达到接口防刷目的

    如下图所示

    工程

    项目地址:

    https://github.com/Tonciy/interface-brush-protection

    Apifox地址:Apifox 密码:Lyh3j2Rv

    其中,Interceptor处代码处理逻辑最为重要

    /**
     * @author: Zero
     * @time: 2023/2/14
     * @description: 接口防刷拦截处理
     */

    @Slf4j
    public class AccessLimintInterceptor  implements HandlerInterceptor {
        @Resource
        private RedisTemplate redisTemplate;

        /**
         * 多长时间内
         */

        @Value("${interfaceAccess.second}")
        private Long second = 10L;

        /**
         * 访问次数
         */

        @Value("${interfaceAccess.time}")
        private Long time = 3L;

        /**
         * 禁用时长--单位/秒
         */

        @Value("${interfaceAccess.lockTime}")
        private Long lockTime = 60L;

        /**
         * 锁住时的key前缀
         */

        public static final String LOCK_PREFIX = "LOCK";

        /**
         * 统计次数时的key前缀
         */

        public static final String COUNT_PREFIX = "COUNT";


        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {

            String uri = request.getRequestURI();
            String ip = request.getRemoteAddr(); // 这里忽略代理软件方式访问,默认直接访问,也就是获取得到的就是访问者真实ip地址
            String lockKey = LOCK_PREFIX + ip + uri;
            Object isLock = redisTemplate.opsForValue().get(lockKey);
            if(Objects.isNull(isLock)){
                // 还未被禁用
                String countKey = COUNT_PREFIX + ip + uri;
                Object count = redisTemplate.opsForValue().get(countKey);
                if(Objects.isNull(count)){
                    // 首次访问
                    log.info("首次访问");
                    redisTemplate.opsForValue().set(countKey,1,second, TimeUnit.SECONDS);
                }else{
                    // 此用户前一点时间就访问过该接口
                    if((Integer)count                     // 放行,访问次数 + 1
                        redisTemplate.opsForValue().increment(countKey);
                    }else{
                        log.info("{}禁用访问{}",ip, uri);
                        // 禁用
                        redisTemplate.opsForValue().set(lockKey, 1,lockTime, TimeUnit.SECONDS);
                        // 删除统计
                        redisTemplate.delete(countKey);
                        throw new CommonException(ResultCode.ACCESS_FREQUENT);
                    }
                }
            }else{
                // 此用户访问此接口已被禁用
                throw new CommonException(ResultCode.ACCESS_FREQUENT);
            }
            return true;
        }
    }

    在多长时间内访问接口多少次,以及禁用的时长,则是通过与配置文件配合动态设置

    当处于禁用时直接抛异常则是通过在ControllerAdvice处统一处理 (这里代码写的有点丑陋)

    下面是一些测试(可以把项目通过Git还原到“【初始化】”状态进行测试)

    • 正常访问时

    • 访问次数过于频繁时

    自我提问

    上述实现就好像就已经达到了我们的接口防刷目的了

    但是,还不够

    为方便后续描述,项目中新增补充Controller,如下所示

    简单来说就是

    • PassCotrollerRefuseController
    • 每个Controller分别有对应的get,post,put,delete类型的方法,其映射路径与方法名称一致

    接口自由

    • 对于上述实现,不知道你们有没有发现一个问题
    • 就是现在我们的接口防刷处理,针对是所有的接口(项目案例中我只是写的接口比较少)
    • 而在实际开发中,说对于所有的接口都要做防刷处理,感觉上也不太可能(写此文时目前大四,实际工作经验较少,这里不敢肯定)
    • 那么问题有了,该如何解决呢?目前来说想到两个解决方案
    拦截器映射规则

    项目通过Git还原到”【Interceptor设置映射规则实现接口自由】”版本即可得到此案例实现

    我们都知道拦截器是可以设置拦截规则的,从而达到拦截处理目的

    1.这个AccessInterfaceInterceptor是专门用来进行防刷处理的,那么实际上我们可以通过设置它的映射规则去匹配需要进行【接口防刷】的接口即可

    2.比如说下面的映射配置

    3.这样就初步达到了我们的目的,通过映射规则的配置,只针对那些需要进行【接口防刷】的接口才会进行处理

    4.至于为啥说是初步呢?下面我就说说目前我想到的使用这种方式进行【接口防刷】的不足点:

    所有要进行防刷处理的接口统一都是配置成了 x 秒内 y 次访问次数,禁用时长为 z 秒

    • 要知道就是要进行防刷处理的接口,其 x, y, z的值也是并不一定会统一的
    • 某些防刷接口处理比较消耗性能的,我就把x, y, z设置的紧一点
    • 而某些防刷接口处理相对来说比较快,我就把x, y, z 设置的松一点
    • 这没问题吧
    • 但是现在呢?x, y, z值全都一致了,这就不行了
    • 这就是其中一个不足点
    • 当然,其实针对当前这种情况也有解决方案
    • 那就是弄多个拦截器
    • 每个拦截器的【接口防刷】处理逻辑跟上述一致,并去映射对应要处理的防刷接口
    • 唯一不同的就是在每个拦截器内部,去修改对应防刷接口需要的x, y, z值
    • 这样就是感觉会比较麻烦

    防刷接口映射路径修改后维护问题

    • 虽然说防刷接口的映射路径基本上定下来后就不会改变
    • 但实际上前后端联调开发项目时,不会有那么严谨的Api文档给我们用(这个在实习中倒是碰到过,公司不是很大,开发起来也就不那么严谨,啥都要自己搞,功能能实现就好)
    • 也就是说还是会有那种要修改接口的映射路径需求
    • 当防刷接口数量特别多,后面的接手人员就很痛苦了
    • 就算是项目是自己从0到1实现的,其实有时候项目开发到后面,自己也会忘记自己前面是如何设计的
    • 而使用当前这种方式的话,谁维护谁蛋疼
    自定义注解 + 反射

    咋说呢

    • 就是通过自定义注解中定义 x 秒内 y 次访问次数,禁用时长为 z 秒
    • 自定义注解 + 在需要进行防刷处理的各个接口方法上
    • 在拦截器中通过反射获取到各个接口中的x, y, z值即可达到我们想要的接口自由目的

    下面做个实现

    声明自定义注解

    Controlller中方法中使用

    Interceptor处逻辑修改(最重要是通过反射判断此接口是否需要进行防刷处理,以及获取到x, y, z的值)

    /**
     * @author: Zero
     * @time: 2023/2/14
     * @description: 接口防刷拦截处理
     */

    @Slf4j
    public class AccessLimintInterceptor  implements HandlerInterceptor {
        @Resource
        private RedisTemplate redisTemplate;
        /**
         * 锁住时的key前缀
         */

        public static final String LOCK_PREFIX = "LOCK";

        /**
         * 统计次数时的key前缀
         */

        public static final String COUNT_PREFIX = "COUNT";


        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
    //        自定义注解 + 反射 实现
            // 判断访问的是否是接口方法
            if(handler instanceof HandlerMethod){
                // 访问的是接口方法,转化为待访问的目标方法对象
                HandlerMethod targetMethod = (HandlerMethod) handler;
                // 取出目标方法中的 AccessLimit 注解
                AccessLimit accessLimit = targetMethod.getMethodAnnotation(AccessLimit.class);
                // 判断此方法接口是否要进行防刷处理(方法上没有对应注解就代表不需要,不需要的话进行放行)
                if(!Objects.isNull(accessLimit)){
                    // 需要进行防刷处理,接下来是处理逻辑
                    String ip = request.getRemoteAddr();
                    String uri = request.getRequestURI();
                    String lockKey = LOCK_PREFIX + ip + uri;
                    Object isLock = redisTemplate.opsForValue().get(lockKey);
                    // 判断此ip用户访问此接口是否已经被禁用
                    if (Objects.isNull(isLock)) {
                        // 还未被禁用
                        String countKey = COUNT_PREFIX + ip + uri;
                        Object count = redisTemplate.opsForValue().get(countKey);
                        long second = accessLimit.second();
                        long maxTime = accessLimit.maxTime();

                        if (Objects.isNull(count)) {
                            // 首次访问
                            log.info("首次访问");
                            redisTemplate.opsForValue().set(countKey, 1, second, TimeUnit.SECONDS);
                        } else {
                            // 此用户前一点时间就访问过该接口,且频率没超过设置
                            if ((Integer) count                             redisTemplate.opsForValue().increment(countKey);
                            } else {

                                log.info("{}禁用访问{}", ip, uri);
                                long forbiddenTime = accessLimit.forbiddenTime();
                                // 禁用
                                redisTemplate.opsForValue().set(lockKey, 1, forbiddenTime, TimeUnit.SECONDS);
                                // 删除统计--已经禁用了就没必要存在了
                                redisTemplate.delete(countKey);
                                throw new CommonException(ResultCode.ACCESS_FREQUENT);
                            }
                        }
                    } else {
                        // 此用户访问此接口已被禁用
                        throw new CommonException(ResultCode.ACCESS_FREQUENT);
                    }
                }
            }
            return  true;
        }
    }

    由于不好演示效果,这里就不贴测试结果图片了

    项目通过Git还原到”【自定义主键+反射实现接口自由”版本即可得到此案例实现,后面自己可以针对接口做下测试看看是否如同我所说的那样实现自定义x, y, z 的效果

    嗯,现在看起来,可以针对每个要进行防刷处理的接口进行针对性自定义多长时间内的最大访问次数,以及禁用时长,哪个接口需要,就直接+在那个接口方法出即可

    感觉还不错的样子,现在网上挺多资料也都是这样实现的

    但是还是可以有改善的地方

    先举一个例子,以我们的PassController为例,如下是其实现

    下图是其映射路径关系

    同一个Controller的所有接口方法映射路径的前缀都包含了/pass

    我们在类上通过注解@ReqeustMapping标记映射路径/pass,这样所有的接口方法前缀都包含了/pass,并且以致于后面要修改映射路径前缀时只需改这一块地方即可

    这也是我们使用SpringMVC最常见的用法

    那么,我们的自定义注解也可不可以这样做呢?先无中生有个需求

    假设PassController中所有接口都是要进行防刷处理的,并且他们的x, y, z值就一样

    如果我们的自定义注解还是只能加载方法上的话,一个一个接口加,那么无疑这是一种很呆的做法

    要改的话,其实也很简单,首先是修改自定义注解,让其可以作用在类上

    接着就是修改AccessLimitInterceptor的处理逻辑

    AccessLimitInterceptor中代码修改的有点多,主要逻辑如下

    与之前实现比较,不同点在于x, y, z的值要首先尝试在目标类中获取

    其次,一旦类中标有此注解,即代表此类下所有接口方法都要进行防刷处理

    如果其接口方法同样也标有此注解,根据就近优先原则,以接口方法中的注解标明的值为准

    /**
     * @author: Zero
     * @time: 2023/2/14
     * @description: 接口防刷拦截处理
     */

    @Slf4j
    public class AccessLimintInterceptor implements HandlerInterceptor {
        @Resource
        private RedisTemplate redisTemplate;

        /**
         * 锁住时的key前缀
         */

        public static final String LOCK_PREFIX = "LOCK";

        /**
         * 统计次数时的key前缀
         */

        public static final String COUNT_PREFIX = "COUNT";


        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {

    //      自定义注解 + 反射 实现, 版本 2.0
            if (handler instanceof HandlerMethod) {
                // 访问的是接口方法,转化为待访问的目标方法对象
                HandlerMethod targetMethod = (HandlerMethod) handler;
                // 获取目标接口方法所在类的注解@AccessLimit
                AccessLimit targetClassAnnotation = targetMethod.getMethod().getDeclaringClass().getAnnotation(AccessLimit.class);
                // 特别注意不能采用下面这条语句来获取,因为 Spring 采用的代理方式来代理目标方法
                //  也就是说targetMethod.getClass()获得是class org.springframework.web.method.HandlerMethod ,而不知我们真正想要的 Controller
    //            AccessLimit targetClassAnnotation = targetMethod.getClass().getAnnotation(AccessLimit.class);
                // 定义标记位,标记此类是否加了@AccessLimit注解
                boolean isBrushForAllInterface = false;
                String ip = request.getRemoteAddr();
                String uri = request.getRequestURI();
                long second = 0L;
                long maxTime = 0L;
                long forbiddenTime = 0L;
                if (!Objects.isNull(targetClassAnnotation)) {
                    log.info("目标接口方法所在类上有@AccessLimit注解");
                    isBrushForAllInterface = true;
                    second = targetClassAnnotation.second();
                    maxTime = targetClassAnnotation.maxTime();
                    forbiddenTime = targetClassAnnotation.forbiddenTime();
                }
                // 取出目标方法中的 AccessLimit 注解
                AccessLimit accessLimit = targetMethod.getMethodAnnotation(AccessLimit.class);
                // 判断此方法接口是否要进行防刷处理
                if (!Objects.isNull(accessLimit)) {
                    // 需要进行防刷处理,接下来是处理逻辑
                    second = accessLimit.second();
                    maxTime = accessLimit.maxTime();
                    forbiddenTime = accessLimit.forbiddenTime();
                    if (isForbindden(second, maxTime, forbiddenTime, ip, uri)) {
                        throw new CommonException(ResultCode.ACCESS_FREQUENT);
                    }
                } else {
                    // 目标接口方法处无@AccessLimit注解,但还要看看其类上是否加了(类上有加,代表针对此类下所有接口方法都要进行防刷处理)
                    if (isBrushForAllInterface && isForbindden(second, maxTime, forbiddenTime, ip, uri)) {
                        throw new CommonException(ResultCode.ACCESS_FREQUENT);
                    }
                }
            }
            return true;
        }

        /**
         * 判断某用户访问某接口是否已经被禁用/是否需要禁用
         *
         * @param second        多长时间  单位/秒
         * @param maxTime       最大访问次数
         * @param forbiddenTime 禁用时长 单位/秒
         * @param ip            访问者ip地址
         * @param uri           访问的uri
         * @return ture为需要禁用
         */

        private boolean isForbindden(long second, long maxTime, long forbiddenTime, String ip, String uri) {
            String lockKey = LOCK_PREFIX + ip + uri; //如果此ip访问此uri被禁用时的存在Redis中的 key
            Object isLock = redisTemplate.opsForValue().get(lockKey);
            // 判断此ip用户访问此接口是否已经被禁用
            if (Objects.isNull(isLock)) {
                // 还未被禁用
                String countKey = COUNT_PREFIX + ip + uri;
                Object count = redisTemplate.opsForValue().get(countKey);
                if (Objects.isNull(count)) {
                    // 首次访问
                    log.info("首次访问");
                    redisTemplate.opsForValue().set(countKey, 1, second, TimeUnit.SECONDS);
                } else {
                    // 此用户前一点时间就访问过该接口,且频率没超过设置
                    if ((Integer) count                     redisTemplate.opsForValue().increment(countKey);
                    } else {
                        log.info("{}禁用访问{}", ip, uri);
                        // 禁用
                        redisTemplate.opsForValue().set(lockKey, 1, forbiddenTime, TimeUnit.SECONDS);
                        // 删除统计--已经禁用了就没必要存在了
                        redisTemplate.delete(countKey);
                        return true;
                    }
                }
            } else {
                // 此用户访问此接口已被禁用
                return true;
            }
            return false;
        }
    }

    好了,这样就达到我们想要的效果了

    项目通过Git还原到”【自定义注解+反射实现接口自由-版本2.0】”版本即可得到此案例实现,自己可以测试万一下

    这是目前来说比较理想的做法,至于其他做法,暂时没啥了解到

    时间逻辑漏洞

    这是我一开始都有留意到的问题

    也是一直搞不懂,就是我们现在的所有做法其实感觉都不是严格意义上的x秒内y次访问次数

    特别注意这个x秒,它是连续,任意的(代表这个x秒时间片段其实是可以发生在任意一个时间轴上)

    我下面尝试表达我的意思,但是我不知道能不能表达清楚

    假设我们固定某个接口5秒内只能访问3次,以下面例子为例

    底下的小圆圈代表此刻请求访问接口

    按照我们之前所有做法的逻辑走

    1. 第2秒请求到,为首次访问,Redis中统计次数为1(过期时间为5秒)
    2. 第7秒,此时有两个动作,一是请求到,二是刚刚第二秒Redis存的值现在过期
    3. 我们先假设这一刻,请求处理完后,Redis存的值才过期
    4. 按照这样的逻辑走
    5. 第七秒请求到,Redis存在对应key,且不大于3, 次数+1
    6. 接着这个key立马过期
    7. 再继续往后走,第8秒又当做新的一个起始,就不往下说了,反正就是不会出现禁用的情况

    按照上述逻辑走,实际上也就是说当出现首次访问时,当做这5秒时间片段的起始

    第2秒是,第8秒也是

    但是有没有想过,实际上这个5秒时间片段实际上是可以放置在时间轴上任意区域的

    上述情况我们是根据请求的到来情况人为的把它放在【2-7】,【8-13】上

    而实际上这5秒时间片段是可以放在任意区域的

    那么,这样的话,【7-12】也可以放置

    而【7-12】这段时间有4次请求,就达到了我们禁用的条件了

    是不是感觉怪怪的

    想过其他做法,但是好像严格意义上真的做不到我所说的那样(至少目前来说想不到)

    之前我们的做法,正常来说也够用,至少说有达到防刷的作用

    后面有机会的话再看看,不知道我是不是钻牛角尖了

    路径参数问题

    假设现在PassController中有如下接口方法

    也就是我们在接口方法中常用的在请求路径中获取参数的套路

    但是使用路径参数的话,就会发生问题

    那就是同一个ip地址访问此接口时,我携带的参数值不同

    按照我们之前那种前缀+ip+uri拼接的形式作为key的话,其实是区分不了的

    下图是访问此接口,携带不同参数值时获取的uri状况

    这样的话在我们之前拦截器的处理逻辑中,会认为是此ip用户访问的是不同的接口方法,而实际上访问的是同一个接口方法

    也就导致了【接口防刷】失效

    接下来就是解决它,目前来说有两种

    1. 不要使用路径参数

    这算是比较理想的做法,相当于没这个问题

    但有一定局限性,有时候接手别的项目,或者自己根本没这个权限说不能使用路径参数

    1. 替换uri
    • 我们获取uri的目的,其实就是为了区别访问接口
    • 而把uri替换成另一种可以区分访问接口方法的标识即可
    • 最容易想到的就是通过反射获取到接口方法名称,使用接口方法名称替换成uri即可
    • 当然,其实不同的Controller中,其接口方法名称也有可能是相同的
    • 实际上可以再获取接口方法所在类类名,使用类名 + 方法名称替换uri即可
    • 实际解决方案有很多,看个人需求吧

    真实ip获取

    在之前的代码中,我们获取代码都是通过request.getRemoteAddr()获取的

    但是后续有了解到,如果说通过代理软件方式访问的话,这样是获取不到来访者的真实ip的

    至于如何获取,后续我再研究下http再说,这里先提个醒

    总结

    说实话,挺有意思的,一开始自己想【接口防刷】的时候,感觉也就是转化成统计下访问次数的问题摆了。后面到网上看别人的写法,又再自己给自己找点问题出来,后面会衍生出来一推东西出来,诸如自定义注解+反射这种实现方式。

    以前其实对注解 + 反射其实有点不太懂干嘛用的,而从之前的数据报表导出,再到基本权限控制实现,最后到今天的【接口防刷】一点点来进步去补充自己的知识点,而且,感觉写博客真的是件挺有意义的事情,它会让你去更深入的了解某个点,并且知识是相关联的,探索的过程中会牵扯到其他别的知识点,就像之前的写的【单例模式】实现,一开始就了解到懒汉式,饿汉式

    后面深入的话就知道其实会还有序列化/反序列化,反射调用生成实例,对象克隆这几种方式回去破坏单例模式,又是如何解决的,这也是一个进步的点,后续为了保证线程安全问题,牵扯到的synchronized,voliate关键字,继而又关联到JVM,JUC,操作系统的东西。

  • 船新 IDEA 2023.1 正式发布,新特性真香!

    大家好,昨晚看到 IDEA 官推宣布 IntelliJ IDEA 2023.1 正式发布了。简单看了一下,发现这次的新版本包含了许多改进,进一步优化了用户体验,提高了便捷性。

    至于是否升级最新版本完全是个人意愿,如果觉得新版本没有让自己感兴趣的改进,完全就不用升级,影响不大。软件的版本迭代非常正常,正确看待即可,不持续改进就会慢慢被淘汰!

    根据官方介绍:

    IntelliJ IDEA 2023.1 针对新的用户界面进行了大量重构,这些改进都是基于收到的宝贵反馈而实现的。官方还实施了性能增强措施,使得 Maven 导入更快,并且在打开项目时 IDE 功能更早地可用。由于后台提交检查,新版本提供了简化的提交流程。IntelliJ IDEA Ultimate 现在支持 Spring Security 匹配器和请求映射导航。

    下面对这个版本的一些比较有意思的改进进行详细介绍。

    新 UI 增强(测试版)

    针对收到的有关 IDE 新用户界面的反馈,IntelliJ IDEA 官方实施了一些更新,以解决最受欢迎的请求。引入了紧凑模式,通过缩小间距和元素提供更加集中的 IDE 外观和感觉。新 UI 现在提供一个选项来垂直分割工具窗口区域,并方便地排列窗口,就像旧 UI 一样。主窗口标题栏中的运行小部件已经重新设计,使其外观不显眼且更易于查看。

    在项目打开时更早提供 IDE 功能

    IntelliJ IDEA 官方通过在智能模式下执行扫描文件以建立索引的过程来改进了 IDE 启动体验,这样即可使 IDE 的全部功能在启动过程中更早地可用。当打开一个项目时,IntelliJ IDEA 2023.1 会使用上一次与该项目的会话中存在的缓存,并同时查找要建立索引的文件。如果扫描中没有发现任何更改,则 IDE 将准备就绪,消除了之前由于启动时进行索引而导致的延迟。

    更快地导入 Maven 项目

    更快地导入 Maven 项目

    官方通过优化依赖解析以及重新设计导入和配置 facets 的过程,显著提高了 IDE 在导入 Maven 项目时的性能。

    后台提交检查

    后台提交检查

    官方重新设计了 Git 和 Mercurial 的提交检查行为,以加速整个提交过程。现在,在提交但尚未推送之前会在后台执行检查。

    Spring Security 匹配器和请求映射的导航

    Spring Security 匹配器和请求映射的导航

    为了简化查看应用安全规则,IntelliJ IDEA Ultimate 2023.1 提供了从 Spring 控制器到安全匹配器的轻松导航。该导航可以从安全匹配器到控制器以及反向工作。

    全 IDE 缩放

    全 IDE 缩放

    在 v2023.1 中,可以完全放大和缩小 IDE,同时增加或缩减所有 UI 元素的大小。从主菜单中,选择 View | Appearance(视图 | 外观),调整 IDE 的缩放比例。此外,您可以在 Settings/Preferences | Keymap | Main Menu | View | Appearance(设置/偏好设置 | 按键映射 | 主菜单 | 视图 | 外观)中指定调用这些操作的自定义快捷键。

    新的 Java 检查

    新的 Java 检查

    官方为了帮助保持代码整洁和无错误,升级了一些现有的 Java 检查,并添加了新的检查。格式不正确字符串检查现在报告不符合常见 Java 语法的非法时间转换。冗余字符串操作检查现在能够检测到多余的 StringBuilder.toString() 调用,并提供一个快速修复来将它们替换为 contentEquals(),以便您不会创建中间 String 对象。它还报告 String 构造函数调用中不必要的参数,并建议一个快速修复来删除它们。在这篇博客文章中了解更多关于 IntelliJ IDEA 2023.1 其他代码检查改进。

    Java 20 支持

    Java 20 支持

    继续减少 Java 开发人员认知负荷,IntelliJ IDEA 2023.1 支持最新更新添加到 Java 20 中,包括语言特性模式匹配和记录模式的更改。

    改进了 Extract Method(提取方法)重构

    改进了 Extract Method(提取方法)重构

    官方通过引入选项来升级提取方法重构,即使所选代码片段具有需要返回的多个变量也可以应用该选项。在这些情况下,IDE 首先建议将这些变量封装到一个新记录或 bean 类中,然后执行方法提取。

    VM Options(虚拟机选项)字段中的自动补全

    VM Options(虚拟机选项)字段中的自动补全

    自动补全功能以及集成到 Run/Debug configuration(运行/调试配置)弹出窗口的 VM Options(虚拟机选项)字段中。现在,输入标志的名称时,IDE 会建议可用命令行选项的列表。这适用于 -XX:-X 选项,以及一些未由 IntelliJ IDEA 自动配置的标准选项,如 -ea,但不适用于 -cp–release

    Spring Security 6 支持

    Spring Security 6 支持

    IntelliJ IDEA Ultimate 2023.1 提供了更新的支持,可以导航到 Spring Security 6 中引入的 API 的 URL 映射和安全角色。

    Apache Dubbo 支持

    IntelliJ IDEA 实现了一个新的专用插件,集成了 Apache Dubbo,将该框架的功能作为 IntelliJ IDEA 对 Spring 的支持的一部分。

    Structure(结构)工具窗口中的 VCS 状态颜色提示

    Structure(结构)工具窗口中的 VCS 状态颜色提示

    针对 GitHub 改进了代码审查工作流

    针对 GitHub 改进了代码审查工作流

    为了简化在 IDE 中审查代码的过程,重做了 Pull Request(拉取请求)工具窗口。它现在为您打开的每个拉取请求提供一个专用标签页。标签页会立即显示已更改文件的列表,但它提供的信息比先前更少,让您可以更好地专注于当前任务。现在,可以通过一个新增的专属按钮轻松执行拉取请求当前状态下最相关的操作。

    参考资料

    IntelliJ IDEA 2023.1 更多改进的介绍请参考官方文档:https://www.jetbrains.com/zh-cn/idea/whatsnew/

    
    

  • Java8 Lambda 表达式中的 forEach 如何提前终止?

    快乐分享,Java干货及时送达👇

    # 情景展示

    图片

    如上图所示,我们想要终止for循环,使用return。

    执行结果如下:

    图片

    我们可以看到,只有赵六没被打印出来,后续的数组元素依旧被执行了。

    也就是说,关键字”return”,在这里执行的效果相当于普通for循环里的关键词continue”。

    # 原因分析

    我们知道,在普通for循环里面,想要提前结束(终止)循环体使用”break”;

    结束本轮循环,进行下一轮循环使用”continue”;

    另外,在普通for里,如果使用”return”,不仅强制结束for循环体,还会提前结束包含这个循环体的整个方法。

    而在Java8中的forEach()中,”break”或”continue”是不被允许使用的,而return的意思也不是原来return代表的含义了。

    我们来看看源码:

    图片

    forEach(),说到底是一个方法,而不是循环体,结束一个方法的执行用什么?当然是return啦;

    java8的forEach()和JavaScript的forEach()用法是何其的相似

    Java不是万能的,不要再吐槽它垃圾了。

    # 解决方案

    方案一:使用原始的foreach循环

    图片

    使用过eclipse的老铁们应该知道,当我们输入:foreach,再按快捷键:Alt+/,就会出现foreach的代码提示。

    如上图所示,这种格式的for循环才是真正意义上的foreach循环。

    在idea中输入,按照上述操作是不会有任何代码提示的,那如何才能在idea中,调出来呢?

    图片

    for循环可以提前终止。

    方式一:break

    图片

    方式二:return(不推荐使用)

    图片

    方案二:抛出异常

    我们知道,要想结束一个方法的执行,正常的逻辑是:使用return;

    但是,在实际运行中,往往有很多不突发情况导致代码提前终止,比如:空指针异常,其实,我们也可以通过抛出假异常的方式来达到终止forEach()方法的目的。

    图片

    如果觉得这种方式不友好,可以再包装一层。

    图片

    这样,就完美了。

    这里,需要注意的一点是:要确保你forEach()方法体内不能有其它代码可能会抛出的异常与自己手动抛出并捕获的异常一样;

    否则,当真正该因异常导致代码终止的时候,因为咱们手动捕获了并且没做任何处理,岂不是搬起石头砸自己的脚吗?

    来源 | https://blog.csdn.net/weixin_39597399/article/details/114232746

    
    

  • MySQL 索引失效跑不出这 8 个场景

    快乐分享,Java干货及时送达👇

    SQL 写不好 加班少不了  日常工作中SQL 是必不可少的一项技术 但是很多人不会过多的去关注SQL问题。

    一是数据量小,二是没有意识到索引的重要性。本文主要是整理 SQL失效场景,如果里面的细节你都知道,那你一定是学习能力比较好的人,膜拜~

    写完这篇文章 我感觉自己之前知道的真的是 “目录” 没有明白其中的内容,如果你能跟着节奏看完文章,一定会有收获,至少我写完感觉思维通透很多,以后百分之九十的 SQl索引问题 和 面试这方面问题都能拿捏两。


    基础数据准备





    准备一个数据表作为 数据演示  这里面一共 创建了三个索引

    • 联合索引  snames_codeaddress

    • 主键索引  id

    • 普通索引  height


    SET NAMES utf8mb4;
    SET FOREIGN_KEY_CHECKS = 0;

    -- ----------------------------
    -- Table structure for student
    -- ----------------------------
    DROP TABLE IF EXISTS `student`;
    CREATE TABLE `student`  (
      `id` int(11NOT NULL AUTO_INCREMENT,
      `sname` varchar(20CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
      `s_code` int(100NULL DEFAULT NULL,
      `address` varchar(100CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
      `height` double NULL DEFAULT NULL,
      `classid` int(11NULL DEFAULT NULL,
      `create_time` datetime(0NOT NULL ON UPDATE CURRENT_TIMESTAMP(0),
      PRIMARY KEY (`id`USING BTREE,
      INDEX `普通索引`(`height`USING BTREE,
      INDEX `联合索引`(`sname``s_code``address`USING BTREE
    ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

    -- ----------------------------
    -- Records of student
    -- ----------------------------
    INSERT INTO `student` VALUES (1'学生1'1'上海'1701'2022-11-02 20:44:14');
    INSERT INTO `student` VALUES (2'学生2'2'北京'1802'2022-11-02 20:44:16');
    INSERT INTO `student` VALUES (3'变成派大星'3'京东'1853'2022-11-02 20:44:19');
    INSERT INTO `student` VALUES (4'学生4'4'联通'1904'2022-11-02 20:44:25');


    正文





    上面的SQL 我们已经创建好基本的数据  在验证之前 先带着几个问题

    我们先从上往下进行验证


    最左匹配原则





    写在前面:我很早之前就听说过数据库的最左匹配原则,当时是通过各大博客论坛了解的,但是这些博客的局限性在于它们对最左匹配原则的描述就像一些数学定义一样,往往都是列出123点,满足这123点就能匹配上索引,否则就不能。

    最左匹配原则就是指在联合索引中,如果你的 SQL 语句中用到了联合索引中的最左边的索引,那么这条 SQL 语句就可以利用这个联合索引去进行匹配,我们上面建立了联合索引 可以用来测试最左匹配原则 snames_codeaddress

    请看下面SQL语句 进行思考 是否会走索引

    -- 联合索引 sname,s_code,address

    1、select create_time from student where sname = "变成派大星"  -- 会走索引吗?

    2select create_time from student where s_code = 1   -- 会走索引吗?

    3select create_time from student where address = "上海"  -- 会走索引吗?

    4select create_time from student where address = "上海" and s_code = 1 -- 会走索引吗?

    5select create_time from student where address = "上海" and sname = "变成派大星"  -- 会走索引吗?

    6select create_time from student where sname = "变成派大星" and address = "上海"  -- 会走索引吗?

    7select create_time from student where sname = "变成派大星" and s_code = 1 and address = "上海"  -- 会走索引吗?

    凭你的经验 哪些会使用到索引呢 ?可以先思考一下 在心中记下数字

    走索引例子

    EXPLAIN  select create_time from student where sname = "变成派大星"  -- 会走索引吗?

    未走索引例子

    EXPLAIN select create_time from student where address = "上海" and s_code = 1 -- 会走索引吗?

    走的全表扫描 rows = 4

    如果你内心的答案没有全部说对就接着往下看

    最左匹配原则顾名思义:最左优先,以最左边的为起点任何连续的索引都能匹配上。同时遇到范围查询(>、。

    例如:s_code = 2 如果建立(snames_code)顺序的索引,是匹配不到(snames_code)索引的;

    但是如果查询条件是sname = “变成派大星” and s_code = 2或者a=1(又或者是s_code = 2 and sname = “变成派大星” )就可以,因为优化器会自动调整snames_code的顺序

    再比如sname = “变成派大星” and s_code > 1 and address = “上海”  address是用不到索引的,因为s_code字段是一个范围查询,它之后的字段会停止匹配。

    不带范围查询 索引使用类型

    带范围使用类型

    根据上一篇文章的讲解 可以明白 ref 和range的含义  级别还是相差很多的

    思考

    为什么左链接一定要遵循最左缀原则呢?

    验证

    看过一个比较好玩的回答:

    你可以认为联合索引是闯关游戏的设计
    例如你这个联合索引是state/city/zipCode
    那么state就是第一关 city是第二关, zipCode就是第三关
    你必须匹配了第一关,才能匹配第二关,匹配了第一关和第二关,才能匹配第三关

    这样描述不算完全准确 但是确实是这种思想

    要想理解联合索引的最左匹配原则,先来理解下索引的底层原理。索引的底层是一颗B+树,那么联合索引的底层也就是一颗B+树,只不过联合索引的B+树节点中存储的是键值。由于构建一棵B+树只能根据一个值来确定索引关系,所以数据库依赖联合索引最左的字段来构建 文字比较抽象 我们看一下

    加入我们建立 A,B 联合索引 他们在底层储存是什么样子呢?

    • 橙色代表字段 A

    • 浅绿色 代表字段B

    图解:

    我们可以看出几个特点

    • A 是有顺序的  1,1,2,2,3,4

    • B 是没有顺序的 1,2,1,4,1,2 这个是散列的

    • 如果A是等值的时候 B是有序的  例如 (1,1),(1,2) 这里的B有序的 (2,1),(2,4) B 也是有序的

    这里应该就能看出 如果没有A的支持 B的索引是散列的 不是连续的

    再细致一点 我们重新创建一个表

    DROP TABLE IF EXISTS `leftaffix`;

    CREATE TABLE `leftaffix`  (

      `a` int(11NOT NULL AUTO_INCREMENT,

      `b` int(11NULL DEFAULT NULL,

      `c` int(11NULL DEFAULT NULL,

      `d` int(11NULL DEFAULT NULL,

      `e` varchar(11CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,

      PRIMARY KEY (`a`USING BTREE,

      INDEX `联合索引`(`b``c``d`USING BTREE

    ENGINE = InnoDB AUTO_INCREMENT = 8 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
     
    -- ----------------------------
    -- Records of leftaffix
    -- ----------------------------
    INSERT INTO `leftaffix` VALUES (1111'1');

    INSERT INTO `leftaffix` VALUES (2222'2');

    INSERT INTO `leftaffix` VALUES (3322'3');

    INSERT INTO `leftaffix` VALUES (4311'4');

    INSERT INTO `leftaffix` VALUES (5235'5');

    INSERT INTO `leftaffix` VALUES (6644'6');

    INSERT INTO `leftaffix` VALUES (7888'7');
    SET FOREIGN_KEY_CHECKS = 1;

    在创建索引树的时候会对数据进行排序 根据最左缀原则  会先通过 B 进行排序 也就是 如果出现值相同就 根据 C 排序 如果 C相同就根据D 排序 排好顺序之后就是如下图:

    索引的生成就会根据图二的顺序进行生成  我们看一下 生成后的树状数据是什么样子

    解释一些这个树状图  首先根据图二的排序 我们知道顺序 是 1111a  2222b 所以 在第三层 我们可以看到 1111a 在第一层 2222b在第二层  因为 111

    简化一下就是这个样子

    但是这种顺序是相对的。这是因为MySQL创建联合索引的规则是首先会对联合索引的最左边第一个字段排序,在第一个字段的排序基础上,然后在对第二个字段进行排序。所以B=2这种查询条件没有办法利用索引。

    看到这里还可以明白一个道理 为什么我们建立索引的时候不推荐建立在经常改变的字段 因为这样的话我们的索引结构就要跟着你的改变而改动 所以很消耗性能


    补充

    评论区老哥的提示 最左缀原则可以通过跳跃扫描的方式打破 简单整理一下这方面的知识

    这个是在 8.0 进行的优化

    MySQL8.0版本开始增加了索引跳跃扫描的功能,当第一列索引的唯一值较少时,即使where条件没有第一列索引,查询的时候也可以用到联合索引。

    比如我们使用的联合索引是 bcd  但是b中字段比较少 我们在使用联合索引的时候没有 使用 b 但是依然可以使用联合索引MySQL联合索引有时候遵循最左前缀匹配原则,有时候不遵循。


    小总结

    前提 如果创建 b,c,d 联合索引面

    • 如果 我where 后面的条件是c = 1 and d = 1为什么不能走索引呢 如果没有b的话 你查询的值相当于 *11 我们都知道*是所有的意思也就是我能匹配到所有的数据

    • 如果 我 where 后面是 b = 1 and d =1 为什么会走索引呢?你等于查询的数据是 1*1 我可以通过前面 1 进行索引匹配 所以就可以走索引

    • 最左缀匹配原则的最重要的就是 第一个字段

    我们接着看下一个失效场景


    select *





    思考

    这里是我之前的一个思维误区 select * 不会导致索引失效 之前测试发现失效是因为where 后面的查询范围过大 导致索引失效 并不是Select * 引起的  但是为什么不推荐使用select *

    解释

    • 增加查询分析器解析成本。

    • 增减字段容易与 resultMap 配置不一致。

    • 无用字段增加网络 消耗,尤其是 text 类型的字段。


    在阿里的开发手册中,大面的概括了上面几点。

    在使用Select * 索引使用正常

    虽然走了索引但是 也不推荐这种写法 为什么呢?

    首先我们在上一个验证中创建了联合索引 我们使用B=1 会走索引  但是 与直接查询索引字段不同  使用SELECT*,获取了不需要的数据,则首先通过辅助索引过滤数据,然后再通过聚集索引获取所有的列,这就多了一次b+树查询,速度必然会慢很多,减少使用select * 就是降低回表带来的损耗。

    也就是 Select * 在一些情况下是会走索引的 如果不走索引就是 where 查询范围过大 导致MySQL 最优选择全表扫描了 并不是Select * 的问题

    上图就是索引失效的情况

    范围查找也不是一定会索引失效 下面情况就会索引生效就是 级别低 生效的原因是因为缩小了范围


    小总结

    • select * 会走索引

    • 范围查找有概率索引失效但是在特定的情况下会生效 范围小就会使用 也可以理解为 返回结果集小就会使用索引

    • mysql中连接查询的原理是先对驱动表进行查询操作,然后再用从驱动表得到的数据作为条件,逐条的到被驱动表进行查询。

    • 每次驱动表加载一条数据到内存中,然后被驱动表所有的数据都需要往内存中加载一遍进行比较。效率很低,所以mysql中可以指定一个缓冲池的大小,缓冲池大的话可以同时加载多条驱动表的数据进行比较,放的数据条数越多性能io操作就越少,性能也就越好。所以,如果此时使用select * 放一些无用的列,只会白白的占用缓冲空间。浪费本可以提高性能的机会。

    • 按照评论区老哥的说法 select * 不是造成索引失效的直接原因 大部分原因是 where 后边条件的问题 但是还是尽量少去使用select * 多少还是会有影响的



    使用函数





    使用在Select 后面使用函数可以使用索引 但是下面这种做法就不能

    因为索引保存的是索引字段的原始值,而不是经过函数计算后的值,自然就没办法走索引了。

    不过,从 MySQL 8.0 开始,索引特性增加了函数索引,即可以针对函数计算后的值建立一个索引,也就是说该索引的值是函数计算后的值,所以就可以通过扫描索引来查询数据。

    这种写法我没使用过 感觉情况比较少 也比较容易注意到这种写法


    计算操作





    这个情况和上面一样 之所以会导致索引失效是因为改变了索引原来的值 在树中找不到对应的数据只能全表扫描

    因为索引保存的是索引字段的原始值,而不是 b – 1 表达式计算后的值,所以无法走索引,只能通过把索引字段的取值都取出来,然后依次进行表达式的计算来进行条件判断,因此采用的就是全表扫描的方式。

    下面这种计算方式就会使用索引

    Java比较熟悉的可能会有点疑问,这种对索引进行简单的表达式计算,在代码特殊处理下,应该是可以做到索引扫描的,比方将 b – 1 = 6 变成 b = 6 – 1。是的,是能够实现,但是 MySQL 还是偷了这个懒,没有实现。


    小总结

    总而言之 言而总之 只要是影响到索引列的值 索引就是失效


    Like %





    1.这个真的是难受哦  因为经常使用这个 所以还是要小心点 在看为什么失效之前 我们先看一下 Like % 的解释

    • %百分号通配符: 表示任何字符出现任意次数(可以是0次).

    • _下划线通配符: 表示只能匹配单个字符,不能多也不能少,就是一个字符.

    • like操作符: LIKE作用是指示mysql后面的搜索模式是利用通配符而不是直接相等匹配进行比较.


    注意: 如果在使用like操作符时,后面的没有使用通用匹配符效果是和=一致的,

    SELECT * FROM products WHERE products.prod_name like '1000';

    2.匹配包含”Li”的记录(包括记录”Li”) :

    SELECTFROM products WHERE products.prod_name like '%Li%';

    3.匹配以”Li”结尾的记录(包括记录”Li”,不包括记录”Li “,也就是Li后面有空格的记录,这里需要注意)

    SELECT * FROM products WHERE products.prod_name like '%Li';

    在左不走 在右走

    右:虽然走 但是索引级别比较低主要是模糊查询 范围比较大 所以索引级别就比较低

    左:这个范围非常大 所以没有使用索引的必要了 这个可能不是很好优化 还好不是一直拼接上面的

    小总结

    索引的时候和查询范围关系也很大 范围过大造成索引没有意义从而失效的情况也不少


    使用Or导致索引失效





    这个原因就更简单了

    在 WHERE 子句中,如果在 OR 前的条件列是索引列,而在 OR 后的条件列不是索引列,那么索引会失效 举个例子,比如下面的查询语句,b 是主键,e 是普通列,从执行计划的结果看,是走了全表扫描。

    优化

    这个的优化方式就是 在Or的时候两边都加上索引

    就会使用索引 避免全表扫描


    in使用不当





    首先使用In 不是一定会造成全表扫描的 IN肯定会走索引,但是当IN的取值范围较大时会导致索引失效,走全表扫描

    in 在结果集 大于30%的时候索引失效

    not in 和 In的失效场景相同


    order By





    这一个主要是Mysql 自身优化的问题 我们都知道OrderBy 是排序 那就代表我需要对数据进行排序 如果我走索引 索引是排好序的 但是我需要回表 消耗时间 另一种 我直接全表扫描排序 不用回表 也就是

    • 走索引 + 回表

    • 不走索引 直接全表扫描

    Mysql 认为直接全表扫面的速度比 回表的速度快所以就直接走索引了  在Order By 的情况下 走全表扫描反而是更好的选择

    子查询会走索引吗

    答案是会 但是使用不好就不会


    大总结





    转自:进阶的派大星

    链接:https://juejin.cn/post/7161964571853815822

    
    

  • 12种接口优化的通用方案,我又偷偷学到一波~

    快乐分享,Java干货及时送达👇

    一、背景

    针对老项目,去年做了许多降本增效的事情,其中发现最多的就是接口耗时过长的问题,就集中搞了一次接口性能优化。本文将给小伙伴们分享一下接口优化的通用方案。

    图片

    二、接口优化方案总结

    1.批处理

    批量思想:批量操作数据库,这个很好理解,我们在循环插入场景的接口中,可以在批处理执行完成后一次性插入或更新数据库,避免多次IO。

    //批量入库
    batchInsert();

    2.异步处理

    异步思想:针对耗时比较长且不是结果必须的逻辑,我们可以考虑放到异步执行,这样能降低接口耗时。

    例如一个理财的申购接口,入账和写入申购文件是同步执行的,因为是T+1交易,后面这两个逻辑其实不是结果必须的,我们并不需要关注它的实时结果,所以我们考虑把入账和写入申购文件改为异步处理。如图所示:

    图片

    至于异步的实现方式,可以用线程池,也可以用消息队列,还可以用一些调度任务框架。

    3.空间换时间

    一个很好理解的空间换时间的例子是合理使用缓存,针对一些频繁使用且不频繁变更的数据,可以提前缓存起来,需要时直接查缓存,避免频繁地查询数据库或者重复计算。

    需要注意的事,这里用了合理二字,因为空间换时间也是一把双刃剑,需要综合考虑你的使用场景,毕竟缓存带来的数据一致性问题也挺令人头疼。

    这里的缓存可以是R2M,也可以是本地缓存、memcached,或者Map。

    举一个股票工具的查询例子:

    因为策略轮动的调仓信息,每周只更新一次,所以原来的调接口就去查库的逻辑并不合理,而且拿到调仓信息后,需要经过复杂计算,最终得出回测收益和跑赢沪深指数这些我们想要的结果。如果我们把查库操作和计算结果放入缓存,可以节省很多的执行时间。如图:

    图片

    4.预处理

    也就是预取思想,就是提前要把查询的数据,提前计算好,放入缓存或者表中的某个字段,用的时候会大幅提高接口性能。跟上面那个例子很像,但是关注点不同。

    举个简单的例子:理财产品,会有根据净值计算年化收益率的数据展示需求,利用净值去套用年化收益率计算公式计算的逻辑我们可以采用预处理,这样每一次接口调用直接取对应字段就可以了。

    5.池化思想

    我们都用过数据库连接池,线程池等,这就是池思想的体现,它们解决的问题就是避免重复创建对象或创建连接,可以重复利用,避免不必要的损耗,毕竟创建销毁也会占用时间。

    池化思想包含但并不局限于以上两种,总的来说池化思想的本质是预分配与循环使用,明白这个原理后,我们即使是在做一些业务场景的需求时,也可以利用起来。

    比如:对象池

    6.串行改并行

    串行就是,当前执行逻辑必须等上一个执行逻辑结束之后才执行,并行就是两个执行逻辑互不干扰,所以并行相对来说就比较节省时间,当然是建立在没有结果参数依赖的前提下。

    比如,理财的持仓信息展示接口,我们既需要查询用户的账户信息,也需要查询商品信息和banner位信息等等来渲染持仓页,如果是串行,基本上接口耗时就是累加的。如果是并行,接口耗时将大大降低。

    如图:

    图片

    7.索引

    加索引能大大提高数据查询效率,这个在接口设计之出也会考虑到,这里不再多赘述,随着需求的迭代,我们重点整理一下索引不生效的一些场景,希望对小伙伴们有所帮助。

    具体不生效场景不再一一举例,后面有时间的话,单独整理一下。

    图片

    8.避免大事务

    所谓大事务问题,就是运行时间较长的事务,由于事务一致不提交,会导致数据库连接被占用,影响到别的请求访问数据库,影响别的接口性能。

    举个例子:

    @Transactional(value = "taskTransactionManager", propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED, rollbackFor = {RuntimeException.class, Exception.class})
     public BasicResult purchaseRequest(PurchaseRecord record) {
         BasicResult result = new BasicResult();
         ...
         pushRpc.doPush(record);        
         result.setInfo(ResultInfoEnum.SUCCESS);
         return result;
     }

    所以为避免大事务问题,我们可以通过以下方案规避:

    1,RPC调用不放到事务里面

    2,查询操作尽量放到事务之外

    3,事务中避免处理太多数据

    9.优化程序结构

    程序结构问题一般出现在多次需求迭代后,代码叠加形成。会造成一些重复查询、多次创建对象等耗时问题。在多人维护一个项目时比较多见。解决起来也比较简单,我们需要针对接口整体做重构,评估每个代码块的作用和用途,调整执行顺序。

    10.深分页问题

    深分页问题比较常见,分页我们一般最先想到的就是 limit ,为什么会慢,我们可以看下这个SQL:

    select * from purchase_record where productCode = 'PA9044' and status=4 and id > 100000 limit 200

    这样优化的好处是命中了主键索引,无论多少页,性能都还不错,但是局限性是需要一个连续自增的字段

    11.SQL优化

    sql优化能大幅提高接口的查询性能,由于本文重点讲述接口优化的方案,具体sql优化不再一一列举,小伙伴们可以结合索引、分页、等关注点考虑优化方案。

    12.锁粒度避免过粗

    锁一般是为了在高并发场景下保护共享资源采用的一种手段,但是如果锁的粒度太粗,会很影响接口性能。

    关于锁粒度:就是你要锁的范围有多大,不管是synchronized还是redis分布式锁,只需要在临界资源处加锁即可,不涉及共享资源的,不必要加锁,就好比你要上卫生间,只需要把卫生间的门锁上就可以,不需要把客厅的门也锁上。

    错误的加锁方式:

    //非共享资源
    private void notShare(){
    }
    //共享资源
    private void share(){
    }
    private int right(){
        notShare();
        synchronized (this) {
            share();

        }
    }

    三、最后

    接口性能问题形成的原因思考

    我相信很多接口的效率问题不是一朝一夕形成的,在需求迭代的过程中,为了需求快速上线,采取直接累加代码的方式去实现功能,这样会造成以上这些接口性能问题。

    变换思路,更高一级思考问题,站在接口设计者的角度去开发需求,会避免很多这样的问题,也是降本增效的一种行之有效的方式。

    以上,共勉!

    作者:京东开发者

    来源:https://toutiao.io/posts/0kwkbbt