当今互联网应用中,随着业务的发展,数据量越来越大,查询效率越来越高,对于时序数据的存储、查询和分析需求也越来越强烈,这时候 Redis 就成为了首选的方案之一。
Redis 提供了多种数据结构,如字符串、哈希表、列表、集合、有序集合等,每种数据结构都具备不同的特性,可以满足不同的业务需求。其中,有序集合的 score 可以存储时间戳,非常适合用于存储时序数据,例如监控指标、日志、统计数据、报表等。下面举几个时序数据场景例子:
- 监控指标:
假设我们有一个服务,名为 my_service,需要监控它的请求响应时间。我们可以使用 Redis 有序集合来存储数据,每个请求的响应时间作为 value,请求的时间戳作为 score。示例如下:
> ZADD requests:my_service 1613115560 350
(integer) 1
> ZADD requests:my_service 1613115570 450
(integer) 1
> ZADD requests:my_service 1613115580 550
(integer) 1
这些命令向名为 requests:my_service 的有序集合中添加了 3 条数据,分别是 2021 年 2 月 12 日 10:19:20 的请求响应时间为 350ms,10:19:30 的请求响应时间为 450ms,10:19:40 的请求响应时间为 550ms。
接下来,我们来看一下如何使用 Redis 命令查询这些监控指标的数据。下面的命令会返回 requests:my_service 有序集合内所有数据:
> ZRANGE requests:my_service 0 -1 WITHSCORES
1) "350"
2) "1613115560"
3) "450"
4) "1613115570"
5) "550"
6) "1613115580"
命令执行结果表示,数据按照 score 排序,其中 score 是时间戳(单位为秒),value 是请求响应时间(单位为毫秒)。同时,使用 ZRANGEBYSCORE 命令可以获取一段时间范围内的监控数据,例如:
> ZRANGEBYSCORE requests:my_service 1613115570 1613115580 WITHSCORES
1) "450"
2) "1613115570"
3) "550"
4) "1613115580"
这条命令返回了 requests:my_service 有序集合中在时间戳 1613115570 到 1613115580 之间的所有数据。
- 日志:
假设我们要存储的日志是一条指定格式的字符串,包含时间戳和日志内容。使用 Redis 列表存储日志数据,每次写入新日志时可以使用 Redis 列表的 rpush 命令将数据写入列表的尾部。示例如下:
> RPUSH logs:my_logs 2021-02-12 10:30:00 INFO message 1
(integer) 1
> RPUSH logs:my_logs 2021-02-12 10:30:01 ERROR message 2
(integer) 2
> RPUSH logs:my_logs 2021-02-12 10:30:02 WARN message 3
(integer) 3
这些命令向名为 logs:my_logs 的列表尾部添加 3 条数据,分别是 2021 年 2 月 12 日 10:30:00 的 INFO 级别消息,10:30:01 的 ERROR 级别消息和 10:30:02 的 WARN 级别消息。
接下来,我们来看一下如何使用 Redis 命令查询这些日志数据。下面的命令会返回 logs:my_logs 列表内所有数据:
> LRANGE logs:my_logs 0 -1
1) "2021-02-12 10:30:00 INFO message 1"
2) "2021-02-12 10:30:01 ERROR message 2"
3) "2021-02-12 10:30:02 WARN message 3"
命令执行结果表示,数据按照插入顺序排序,从列表头部开始遍历。使用 ZRANGEBYSCORE 命令可以获取一段时间范围内的日志数据,例如:
> ZRANGEBYSCORE logs:my_logs 1613115570 1613115580
1) "2021-02-12 10:30:01 ERROR message 2"
这条命令返回了 logs:my_logs 列表中在时间戳 1613115570 到 1613115580 之间的日志数据,但因为日志数据并没有具体的 time stamp 做 score,所以这个例子只是演示这个命令的用法,实际上应该使用有序集合去查询时间区间内的日志数据。
- 统计数据:
假设我们要存储的统计数据是一些具体业务相关的计数器,例如每分钟用户访问量。我们可以使用 Redis 有序集合来存储统计数据,key 是计数器名称,score 是时间戳,value 是具体的计数值(例如访问次数)。示例如下:
> ZADD visits 1613167800 100
(integer) 1
> ZADD visits 1613167860 120
(integer) 1
> ZADD visits 1613167920 150
(integer) 1
这些命令向名为 visits 的有序集合中添加了 3 条数据,分别是 2021 年 2 月 12 日 23:30:00 的访问次数为 100,23:31:00 的访问次数为 120,23:32:00 的访问次数为 150。
接下来,我们来看一下如何使用 Redis 命令查询这些统计数据。下面的命令会返回 visits 有序集合内所有数据:
> ZRANGE visits 0 -1 WITHSCORES
1) "100"
2) "1613167800"
3) "120"
4) "1613167860"
5) "150"
6) "1613167920"
命令执行结果表示,数据按照 score 排序,其中 score 是时间戳(单位为秒),value 是访问次数。使用 ZRANGEBYSCORE 命令可以获取一段时间范围内的统计数据,例如:
> ZRANGEBYSCORE visits 1613167860 1613167920 WITHSCORES
1) "120"
2) "1613167860"
3) "150"
4) "1613167920"
这条命令返回了 visits 有序集合中在时间戳 1613167860 到 1613167920 之间的所有数据。
使用 Redis 有序集合中的另一个常见场景是计算 TopN,例如找出访问次数最多的前 10 个计数器,可以使用命令 ZREVRANGE visits 0 9 WITHSCORES,它返回 visits 有序集合中前 10 个元素,按照 value 从大到小排列,并且返回每个元素的 score。
需求实践:
这是一个实时监控系统,主要用于记录和统计服务发生的错误情况,以便在错误数量超过预设阈值时发出警告信息。
系统每秒钟生成随机错误数据,并将它们存储到 Redis 数据库中。每隔 10 秒钟,系统会从 Redis 数据库中聚合最近一分钟内的错误数据,并按照服务名和错误类型进行统计计算。如果某个服务的错误数量超过预设阈值,系统会输出一条警告信息提示用户。
整个系统的目标是帮助用户及时了解每个服务的错误情况,以便及时采取相应的措施,保障服务的稳定性和可靠性。
代码示例:
模拟接口服务异常数据
package com.example.demo.redis;
import redis.clients.jedis.Jedis;
import java.util.*;
public class DataGenerator {
// 定义服务列表
private static final List SERVICES = Arrays.asList("service1", "service2", "service3");
// 定义错误列表
public static final List ERRORS = Arrays.asList("invalid_param", "timeout", "unknown_error");
/**
* 生成数据
*
* @param total 数据总数
* @param jedis Redis 客户端连接
*/
public static void generateData(int total, Jedis jedis) {
Random rand = new Random(); // 初始化随机数生成器
long currentTimestamp = System.currentTimeMillis() / 1000; // 获取当前时间戳,精确到秒
long startTimestamp = currentTimestamp - 60; // 计算起始时间戳,为当前时间戳减去 60 秒
for (int i = 0; i < total; i++) { // 循环 total 次,生成 total 条数据
String service = SERVICES.get(rand.nextInt(SERVICES.size())); // 随机选择一个服务
String error = ERRORS.get(rand.nextInt(ERRORS.size())); // 随机选择一个错误
long timestamp = startTimestamp + rand.nextInt(60); // 生成一个随机时间戳,精确到秒,范围为起始时间戳到当前时间戳
int count = 1;
String item = String.format("%s:%s:%d:%d", service, error, timestamp, count);
jedis.zadd("error_data", timestamp, item); // 将错误数据存储到 Redis 数据库中
}
}
}
聚合异常数据,达到阈值告警
package com.example.demo.redis;
import redis.clients.jedis.Jedis;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class DataAggregator {
private static final String REDIS_HOST = "localhost"; // Redis 主机名
private static final int REDIS_PORT = 6379; // Redis 端口号
private static final int THRESHOLD = 100; // 预设阈值,当错误数量超过该阈值时触发警告
public static void main(String[] args) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); // 创建一个只有一个线程的定时任务执行程序
Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT); // 创建 Redis 客户端连接
scheduler.scheduleAtFixedRate(() -> {
// 并发情况下,线程会阻塞
synchronized (jedis) {
DataGenerator.generateData(20, jedis); // 生成随机错误数据,并将其存储到 Redis 数据库中
}
}, 0, 1, TimeUnit.SECONDS); // 定时任务间隔为 1 秒钟
scheduler.scheduleAtFixedRate(() -> { // 定时任务逻辑
synchronized (jedis) {
long currentTimestamp = System.currentTimeMillis() / 1000; // 获取当前时间戳,精确到秒
long startTimestamp = currentTimestamp - 60; // 计算起始时间戳,为当前时间戳减去 60 秒
Set data = jedis.zrangeByScore("error_data", startTimestamp, currentTimestamp); // 使用 zrange 命令获取指定时间范围内的数据
Map> countMap = new HashMap<>(); // 用于记录聚合后的服务和错误数量信息
for (String item : data) { // 遍历所有错误数据
String[] parts = item.split(":"); // 以冒号为分隔符,将错误数据分割为部分
String service = parts[0]; // 获取服务名
String error = parts[1]; // 获取错误类型
long timestamp = Long.parseLong(parts[2]); // 获取时间戳
int count = Integer.parseInt(parts[3]); // 获取错误数量
if (timestamp < startTimestamp) { // 如果时间戳早于起始时间戳,则跳过该数据
continue;
}
Map serviceCountMap = countMap.computeIfAbsent(service, k -> new HashMap<>()); // 获取指定服务的错误数量信息
serviceCountMap.put(error, serviceCountMap.getOrDefault(error, 0) + count); // 更新指定服务和错误类型的错误数量信息
}
List alerts = new ArrayList<>(); // 用于存储警告信息
for (String service : countMap.keySet()) { // 遍历服务名列表
Map serviceCountMap = countMap.get(service); // 获取服务和错误数量信息
int totalErrors = 0;
for (String error : serviceCountMap.keySet()) { // 遍历错误列表
int count = serviceCountMap.get(error); // 获取错误数量
totalErrors += count;
}
if (totalErrors > THRESHOLD) { // 如果错误数量超过预设阈值
alerts.add(service + " has too many errors: " + serviceCountMap.keySet() + ", count: " + totalErrors); // 将该服务名添加到警告信息列表中
}
}
if (!alerts.isEmpty()) { // 如果警告信息列表不为空
System.out.println(String.join("\n", alerts)); // 打印警告信息
}
}
}, 0, 10, TimeUnit.SECONDS); // 定时任务间隔为 10 秒
// 关闭 Redis 连接
jedis.close();
}
}
以上代码可正常运行,有疑问可以交流~~
来源:juejin.cn/post/7219669309537484837