注册

一次关键接口设计和优化带来的思考

实习时负责实现一个任务新增的接口,本来以为应该可以轻松拿捏,结果在实现过程中发现还有点小复杂,优化了很多版,并且其中涉及到了很多之前学过的知识点,故记录一下。


接口基本信息


在无人机管理系统中,对无人机执行任务时的监控是非常重要的模块,系统的用户可以为无人机创建新的飞行任务,除了任务的基本信息外,用户还需要为飞行任务分配负责人,设备,飞手(操作无人机的人),航线,栅栏(任务区域)等信息,而后端实现时需要做好各种校验,对用户数据进行整理转换并插入不同的数据库表中,考虑与系统其他模块的关系(例如航线稽查模块),在系统内通知相关用户,发送邮件给相关用户,另外还要考虑接口幂等性,数据库事务问题,接口的进一步优化。


接口实现



  1. 参数校验

    • 参数非空校验,格式校验,业务上的校验。
    • 其中业务上的校验比较复杂:要保证设备,飞手,航线都存在,且是一 一对应关系;要确保任务的负责人有权限调动相关设备和人员(认证鉴权模块);确保设备,飞手都是可用状态;要检查设备所在位置与任务区域;要检查设备在指定时间内是否已被占用。


  2. 幂等性校验

    • 新增或编辑接口都可能会产生幂等性问题,尤其这种关键的新增接口一般都要保证幂等性。
    • 这里我使用的方案是创建任务时生成一个token保存在redis中,并返回给前端,前端提交任务时在请求中携带token,后端检查到redis中有token证明是第一次访问,删除token并执行后续逻辑(去redis中查并删除token用lua脚本保证原子性),如果请求重复提交则后端查不到token直接返回。
    • 也顺便研究了一下其他幂等性方案,包括前端防重复提交,唯一id限制数据库插入,防重表,全局唯一请求id等,发现还是目前使用redis的这种方案更简单高效。


  3. 生成任务对象,设置任务基本信息,并将下列得到信息赋予任务对象

    • 从线程上下文获取到当前用户信息设为负责人
    • 用设备id,用户id去对应表批量查找对应数据(注意一个任务中设备,飞手,航线是一 一对应,为一个组合,一个任务中可能有多个这种组合)
    • 将航线转化为多个地理点,保存到列表用于后续批量插入任务航线表
    • 为每条航线创建稽查事务对象,保存到列表用于后续批量插入稽查表
    • 将任务区域转化为多个地理点,保存到列表用于后续批量插入任务区域表


  4. 批量插入数据

    • 将任务对象插入任务表,将之前保存的列表分别批量插入到航线表,区域表,稽查表。


  5. 任务创建成功

    • 更新任务状态
    • 通过Kafka异步发送邮件通知飞手和负责人



private final String LUA_SCRIPT =
"if redis.call('EXISTS', KEYS[1]) == 1 then\n" +
" redis.call('DEL', KEYS[1])\n" +
" return true\n" +
"else\n" +
" return false\n" +
"end";

DefaultRedisScript<Boolean> script = new DefaultRedisScript<>(LUA_SCRIPT, Boolean.class);
Boolean success = redisTemplate.execute(script, Collections.singletonList(token));
if (success == null || !success) {
throw new Exception(GlobalErrorCodeEnum.FAIL);
}
// 后续业务逻辑

接口优化


费尽九牛二虎之力写完接口,de完bug后,真正的挑战才开始,此时测试了一下接口的性能,好家伙,平均响应时间1000多ms,肯定是需要优化的,故开始思考优化方案以及测试方案。


压测方案



  • 先屏蔽幂等性校验,设置好接口参数(多个设备,航线长度设置为较长,区域正常设置)
  • 在三种场景下进行测试(弱压力场景:1分钟内100个用户访问。高并发场景:1秒内100个用户访问。高频率场景:2个用户以10QPS持续访问10秒)。以下图片是相关设置
  • 主要关注接口的平均响应时间,吞吐量和错误率。同时CPU使用率,磁盘IO,网络IO也要关注。




优化方案1


首先是把接口中一些不必要的操作删除;并且需要多次查询和插入的数据库操作都改为了批量操作;调整好索引,确保查询能正常走索引。代码与压测结果如下:


注意本文提供的代码仅用于展示,只展示关键步骤,不包含完整实现,若代码中有错误请忽略,理解思路即可。


弱压力和高频率下接口的平均响应时间降低为200ms左右,高并发情况下仍然需要500ms以上,没有出现错误情况,吞吐量也正常。看来数据库操作还是主要耗时的地方。


@Transactional(propagation = Propagation.REQUIRED, rollbackFor = EcpException.class)
public boolean insertTask(TaskInfoVO taskInfoVO) {
TaskInfo taskInfo = new TaskInfo();
// 基本信息查询与填充,分配负责人
// ...
// 查询并分配设备
// ...
List<Devices> devices = deviceService.selectList(new QueryWrapper<dxhpDevices>().in("identity_auth_id", identityAuthIds));
taskInfo.setDevice(getIdentityAuthId());
// 查询并分配飞手
// ...
List<User> devicePerson = userService.selectBatchIds(devicePersonIds);
taskInfo.setDevicePerson(getDevicePerson());
// 处理并分配航线
// ...
List<TaskTrajectory> trajectoryList = getTrajectoryList(taskInfoVO.getTrajectoryList(), taskId);
taskInfo.setTaskTrajectoryId(trajectorysId);
// 对每条航线创建初始稽查记录
// ...
List<Check> checkList = getCheckList(taskInfoVO, taskId, trajectorysId);
taskInfo.setCheckEventId(checkEventsId);
// 分配区域
// ...
List<Range> taskRangeList = getTaskRange(range, taskId);
taskInfo.setTaskRangeId(taskRangeId);
// 插入任务表
this.dao.insert(taskInfo);
// 批量插入任务航线表
trajectoryService.insertBatch(trajectoryList);
// 批量插入任务区域表
...
// 批量插入稽查表
...
}

优化方案2


这里发现数据库的主键使用了uuid,根据之前的学习,uuid是无序的,在插入数据库时会造成页分裂导致效率降低,故考虑把uuid改为数据库自增主键。压测结果如下:


三种情况下的接口平均响应时间都略有降低,但是我重复测试后又发现有时几乎与之前一样,效果不稳定,所以实际使用uuid插入是否真的比自增id插入效率低还不好说,要看具体业务场景。


后来问了导师为什么用uuid做主键,原因是使用uuid方便分库分表,因为不会重复,而自增id在分库分表时可能还要考虑每个表划分id起始点,比较麻烦。


另外,在分布式系统中分布式id的生成是个很重要的基础服务,除了uuid还有雪花算法,数据库唯一主键,redis递增主键,号段模式。


优化方案3


串行改为并行,开启多个线程去并行查询不同模块的数据并做数据库的插入操作,主要使用CompletableFuture类。代码和压测结果如下:


三种场景平均响应时间分别为:82ms,397ms,185ms。弱压力和高频率下性能有所提升,高并发下提升不明显,原因是高并发情况本身CPU就拉满了,再使用多线程去并行就没什么用了。


另外这里使用了自定义的线程池,实际业务中如果需要使用线程池,需要合理设置线程池的相关参数,例如核心线程池,最大线程数,线程池类型,阻塞队列,拒绝策略等,还要考虑线程池隔离。并且需要谨慎分析业务逻辑是否适合使用多线程,有时候加了多线程反而效果更差。


// 开启异步线程执行任务,指定线程池
CompletableFuture.runAsync(() -> {
// 处理航线数据
// ...
List<TaskTrajectory> trajectoryList = getTrajectoryList(taskInfoVO.getTrajectoryList(), taskId);
taskInfo.setTaskTrajectoryId(trajectorysId);
// 批量插入数据库
trajectoryService.insertBatch(trajectoryList)
}, executor);

// 其他模块的操作同理

优化方案4


开启Kafka,将插入操作都变为异步的,即任务表的数据插入后发消息到Kafka中,其他相关表的插入都通过去Kafka中读取消息后再慢慢执行。代码和压测结果如下:


弱压力和高频率下的性能差异不大,但是高并发情况下接口的响应时间又飙到了近1000ms


经过排查,在高并发时CPU和网络IO都拉满了,应该是瞬时向Kafka发送大量消息导致网卡压力比较大,接口的消息发送不出去导致响应时间飙升。如果是正常生产环境下肯定有多台机器分散请求,同时发数据到Kafka,并且有Kafka集群分担接收压力,但是目前只能在我自己机器上测,故高并发场景下将1秒100个请求降为1秒20个请求,并且前面的优化重新测试,比较性能。结果如下


批量插入:接口平均响应时间354ms


uuid改为自增id:接口平均响应时间323ms


串行改并行:接口平均响应时间331ms


用kafka做异步插入:接口平均响应时间191ms


可以看出使用了异步插入后效果还是十分明显的,且CPU和网络IO也都处于合理的范围内。至此优化基本结束,从一开始的近1000ms的响应速度优化到200ms左右,还是有一定提升的。


// 生产者代码
List<TaskTrajectory> trajectoryList = getTrajectoryList(taskInfoVO.getTrajectoryList(), taskId);
String key = IdUtils.uuid(); // 标识不同数据,方便后续Kafka消息防重
MessageVO messageVO = new MessageVO();
messageVO.setMsgID("trajectoryService"); // 告知要操作的类
messageVO.setMsgBody(JSON.toJSONString(trajectoryList)); // 要操作的数据
// 发送消息并指定主题和分区
kafkaTemplate.send("taskTopic", "Partition 1", JSON.toJSONString(messageVO));

// 消费者代码
// 使用 @KafkaListener监听并指定对应的主题和分区
@KafkaListener(id = "listener", topics = "taskTopic", topicPartitions = @TopicPartition(topic = "taskTopic", partitions = "0"))
public void recvTaskMessage(String message, Acknowledgment acknowledgment) {
// 接收消息
MessageVO messageVo = JSON.parseObject(message, MessageVO.class);
// 根据消息的唯一ID,配合redis判断消息是否重复
...
// 消费消息
List<TaskTrajectory> list = JSON.parseArray(messageVo.getMsgBody(), TaskTrajectory.class);
trajectoryService.insertBatch(list);
//手动确认消费完成,通知 Kafka 服务器该消息已经被处理。
acknowledgment.acknowledge();
}

其他问题


问题1


引入了Kafka后需要考虑的问题:消息重复消费,消息丢失,消息堆积,消息有序。


消息重复消费:生产者生成消息时带上唯一id,保存到redis中,消费者消费过后就把id从redis中删除,若有重复的消息到来,消费者去redis中找不到对应id则不处理。(与前面的接口幂等性方案类似)


消息丢失:生产者发送完消息后会回调判断消息是否发送成功,Kafka的Broker收到消息后要回复ACK给生产者,若没有发送成功要重试。Kafka自身则通过副本机制保证消息不丢失。消费者接收并处理完消息后才回复ACK,即设置手动提交offset。


消息堆积:加机器,提高配置,生产者限流。


消息有序:一个消费者消费一个partition,partition中的消息有序,消费者按顺序处理即可。若消费者开启多线程,则要考虑在内存中为每个线程开启队列,相同key的消息按顺序入队处理。


问题2


长事务问题:像新增任务这类接口肯定是需要加事务的,一开始我直接使用了spring的声明式事务,即@Transactional,并且我看其他业务接口好像也都是这样用的,后来思考了一下新增任务这个接口要先查好几个表,再批量插入好几个表,如果用@Transactional全锁住了那肯定会出问题,故后来使用TransactionTemplate编排式事务只对插入的操作加事务。


另外,远程调用的方法也不用加事务,因为无法回滚远程的数据库操作,除非加分布式事务(效率低),一般关键业务远程调用成功但是后续调用失败的话需要设计兜底方案,对远程调用操作的数据进行补偿,保证最终一致性。


// 避免长事务,不使用@Transactional,使用事务编排
transactionTemplate.execute(transactionStatus -> {
try {
this.dao.insert(taskInfo);
trajectoryService.insertBatch(trajectoryList);
...
} catch (Exception e) {
transactionStatus.setRollbackOnly(); // 异常手动设置回滚
}
return true;
});

问题3


线程池隔离:一些关键的接口使用的线程池要与普通接口使用的线程池隔离,否则一旦普通接口把线程池打满,关键接口也会不可用。例如我上面的优化有使用了多线程,可能需要单独开一个线程池或者使用与其他普通接口不同的线程池。


第三方接口异常重试:如果说需要调用第三方接口或者远程服务,需要做好调用失败的兜底方案,根据业务考虑是重试还是直接失败,重试的时间和次数限制等。


接口的权限:黑白名单设置,可用Bloom过滤器实现


日志:关键的业务代码注意打日志进行监测,方便后续排查异常。


以上是我在设计实现一个重要接口,并对其进行优化时所思考的一些问题,当然上面提到的内容不一定完全正确,可能有很多还没考虑到的地方,有些问题也可能有更成熟的解决方案,但是整个思考过程还是很有收获的,期待能够继续成长。


作者:summer哥
来源:juejin.cn/post/7410601536126795811

0 个评论

要回复文章请先登录注册