注册

客户端日志&埋点&上报的线程安全问题

引子

如果设计一个客户端埋点上报库,日志的完整性、高效传输、日志的及时性都是需要考量的点。

其中“高效传输”除了采用更高效的序列化方案、压缩日志、还包含减少通信次数。若每产生一条日志就上报一次就浪费流量了。通常的做法是“批量上报”,即先将日志堆积在内存中,数量达到阈值时才触发一次上报。

批量上传 V1.0

假设埋点上报的实现如下:

object EasyLog {
var maxSize = 50
// 用于堆积日志的列表
private val logs = mutableListOf<Any>()
fun log(any: Any){
logs.add(any)
if(logs.size() >= maxSize) {
uploadLogs(logs)
logs.clear()
}
}
}

这样实现存在多线程安全问题,当log()被多线程并发访问时,共享变量logs并不是线程安全的。在多线程环境下调用 ArrayList.add() 会发生数据损坏,因为 ArrayList.add() 实现如下:

public boolean add(E e) {
ensureCapacityInternal(size + 1);
elementData[size++] = e;
return true;
}

其中的前两句都不是线程安全的。

第一句是扩容,重新申请内存并将原数组拷贝至新数组。如果两个线程发现容量不足同时进行扩容,因为拷贝数组过程不是原子的,若被打断,则已复制的内容可能会被覆盖。

第二句是索引自增,++ 操作也不是原子的,多线程环境下可能发生现有数据被覆盖。

使用@Synchronized注解可以解决该问题:

object EasyLog {
@Synchronized
fun log(any: Any){}
}

相当于为整段代码套上 synchronized,同一瞬间只有一个线程可以输出日志。

性能更好的做法是使用线程安全的容器,比如ConcurrentLinkedQueue,它使用无锁机制实现线程安全的并发读写,关于它源码级别的分析可以点击面试题 | 徒手写一个非阻塞线程安全队列

批量上传 V2.0 —— 调控日志生产消费速度

埋点上报场景中,日志的生产的速度远大于消费速度(上传是耗时操作)。这样用于堆积日志的容器可能无限增长,内存有爆炸的风险。

得使用一种机制调控日志生产和消费速度。比如,丢弃新/旧日志、暂停生产。

Kotlin 中的 Channel 提供了需要的所有功能,包括线程安全以及调控生产消费速度。

使用 Channel 重构如下:

object EasyLog {
// 容量为50的 Channel
private val channel = Channel<Any>(50)
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private var maxSize = 50
private val logs = mutableListOf<Any>()
init {
//新起协程作为日志消费者
scope.launch { channel.consumeEach { innerLog(it) } }
}

fun log(any: Any){
// 新起协程作为日志生产者
scope.launch { channel.send(any) }
}

private fun innerLog(any: Any){
logs.add(any)// 堆积日志
if(logs.size() >= maxSize) {// 日志数量超阈值后上传
uploadLogs(logs)
logs.clear()
}
}
}

每一条新日志都会转发到 Channel 上,在 Channel 另一头有一个单独的协程消费日志。

Channel 就像一个队列,生产者队尾插入,消费者队头取出。它是一个线程安全的容器,多线程并发写没有问题,而且现在只有一个消费者,所以消费日志的代码不会有线程安全问题。就好比四面八方涌入的购票者只能在一个窗口前排队。将并行问题串行化是实现线程安全的一种方法。

Channel 的构造方法可传入三个参数:

public fun <E> Channel(
// 缓存大小
capacity: Int = RENDEZVOUS,
// 溢出策略
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
// 如何处理未传递元素
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>

缓冲大小表示生产速度大于消费速度时最多缓存的元素数。当缓存满后继续产生的元素会触发溢出策略,默认策略是BufferOverflow.SUSPEND,表示挂起生产者,而不是阻塞,生产者线程可以继续运行。这是 Channel 相较于 Java 阻塞队列的优势。

批量上传 V3.0 —— 延迟去小尾巴

若客户端产生了49条日志,应用被杀,那这49条日志就丢了,因为还未达到50条上传阈值。为了确保日志的完整性,不得不对每一条日志进行持久化。然后在下一次应用启动时从磁盘读取并上传之。

这样依然不能满足日志的及时性,比如该用户一周之后才启动应用。

需要一种机制及时处理日志的小尾巴(未达批量阈值的日志):当每一条日志到达时,开启倒计时,如果倒计时归零前无新日志请求,则将已堆积日志批量上传,否则关闭前一个倒计时,开启新的倒计时。

日志&埋点&上报(一)中使用 Handler 实现了这套机制。

这次换用协程实现:

object EasyLog {
// 容量为50的 Channel
private val channel = Channel<Any>(50)
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private var maxSize = 50
private val logs = mutableListOf<Any>()
// 冲刷job
private var flushJob: Job? = null
init {
scope.launch { channel.consumeEach { innerLog(it) } }
}

fun log(any: Any){
scope.launch { channel.send(any) }
}

private fun innerLog(any: Any){
logs.add(log)
flushJob?.cancel() // 取消上一次倒计时
// 若日志数量达到阈值,则直接冲刷,否则延迟冲刷
if (logs.size() >= maxSize) {
flush()
} else {
flushJob = delayFlush()
}
}

// 冲刷:上传内存中堆积的批量日志
private fun flush() {
uploadLogs(logs)
logs.clear()
}

// 延迟冲刷
private fun delayFlush() = scope.launch {
delay(5000)// 延迟5秒,如果没有新日志产生,则冲刷
flush()
}
}

批量上报 V4.0 —— 并行问题串行化

上述代码存在多线程安全问题。

因为延迟冲刷时新起一个协程,导致 logs.clear() 和 logs.add() 可能并发执行。

为了复现这个多线程问题,写了一个压测demo:

// 线程安全的自增 Int 值
val count = AtomicInteger(0)
// 用于验证多线程问题的 Int 数组,初始值都是 -1
private val testArray = Array(100) { -1 }
val list = mutableListOf<Int>()
var flushJob: Job? = null

// 模拟并发生产日志
repeat(100) {
scope.launch(Dispatchers.Default) {
delay((50L..10_00L).random())
channel.send(count.getAndIncrement())
}
}

// 模拟串行消费日志
scope.launch {
channel.consumeEach {
delay((50L..200L).random())
log(it)
}
}
//模拟日志库入口方法
private fun log(value: Int) {
list.add(value)
flushJob?.cancel()
if (list.size >= 5) {// 5条为一批
flush()
} else {
flushJob = delayFlush()
}
}

// 延迟冲刷
fun delayFlush() = scope.launch {
delay(100)
flush()
}

// 冲刷
private fun flush() {
// 以批量日志的值赋值测试数组
list.forEach { testArray[it.value] = it.value }
list.clear()
}

检测方法是并发输出100条日志(内容为一个自增序号0-99),同时声明一个大小为100的数组,所有元素初始化为-1,每当日志被批量处理时将序号对应的数组索引值置为序号值。所有日志处理完毕后,检测数组中是否有-1,若有则表示复现多线程问题,检测代码如下:

val isOk = testArray.any { it == -1 }

若数组中所有的值都不是 -1,则表示 0-100 的日志都被正确地处理了。

运行 demo,isOk 是 false。。。。继续运行了 n 次,居然全都是 false。。。。开始怀疑我对多线程的认知,然后就问 ChatGPT:“理论上这段代码会发生线程安全问题,但是我的测试代码没有复现这个问题,如何才能复现?”

这段代码中的线程安全问题可能不容易复现,因为涉及到多个线程和协程同时运行,而且延迟时间是随机的。要复现这个问题,可以尝试以下方法:

  1. 增加重复次数:增加 repeat(100) 中的数字,这将增加线程安全问题发生的可能性。
  2. 减小延迟时间范围:将 delay((50L..10_00L).random()) 和 delay((50L..200L).random()) 中的延迟时间范围减小,以便让线程和协程更密集地运行。

尽管这些方法可以增加线程安全问题发生的可能性,但并不能保证一定能复现问题。线程安全问题的复现通常取决于操作系统、硬件和运行时环境等多种因素。

听了他的建议调整了 delay 的时间间隔,并增加次数到1000次,多线程问题浮现了:

java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.next(ArrayList.java:860)
at test.taylor.com.taylorcode.concurrent.ConcurrentInitActivity.flush(ConcurrentInitActivity.kt:134)
at test.taylor.com.taylorcode.concurrent.ConcurrentInitActivity.log(ConcurrentInitActivity.kt:66)

在遍历列表的同时增删列表元素就会发生这个问题。即使没有发生上述 crash,也可能会打印出 isOk=true,表示array中有元素为-1。

使用线程安全的ConcurrentLinkedQueue作为容器,仍然无法解决问题,因为它的clear()方法是弱一致性的,它需要遍历整个队列,但遍历是基于快照机制的。

最后还是运用 “将并行问题串行化” 来解决这个多线程问题:

// 单线程 Dispatcher
val logDispatcher = Executors.newFixedThreadPool(1).asCoroutineDispatcher()

// 在单线程上延迟冲刷
fun delayFlush() = scope.launch(logDispatcher) {
delay(50)
flush()
}

// 在单线程上消费日志
scope.launch(logDispatcher) {
channel.consumeEach {
delay((25L..100L).random())
log(it)
}
}

构建一个单独的线程,使得日志的消费和冲刷都在该线程进行。

单线程会降低性能吗? 不会,因为延迟冲刷是挂起剩余的代码,而不会阻塞线程。在单线程上延迟冲刷就好比使用 Handler.postDelay() 将冲刷逻辑排到主线程消息队列的末尾。


作者:唐子玄
链接:https://juejin.cn/post/7256408176412442683
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

0 个评论

要回复文章请先登录注册