高级线程应用之栅栏、信号量、调度组以及source(三)
二、信号量(dispatch_semaphore_t
)
相关函数:
dispatch_semaphore_create
:创建信号量dispatch_semaphore_wait
:信号量等待dispatch_semaphore_signal
:信号量释放
信号量有两个效果:同步作为锁 与 控制GCD
最大并发数。
二元信号量是最简单的一种锁,只有两种状态:占用与非占用。适合只能被唯一一个线程独占访问资源。当二元信号量处于非占用状态时,第一个试图获取该二元信号量的线程会获得该锁,并将二元信号置为占用状态,此后其他的所有视图获取该二元信号量的线程将会等待,直到该锁被释放。
对于允许多个线程并发访问的资源,多元信号量简称信号量,它是一个很好的选择。一个初始值为 N
的信号量允许 N
个线程并发访问。线程访问资源的时候首先获取信号量,进行如下操作:
- 将信号量的值减
1
。 - 如果信号量的值小于
0
,则进入等待状态,否则继续执行。
访问完资源之后,线程释放信号量,进行如下操作:
- 将信号量的值
+1
。 - 如果信号量的值
< 1
,唤醒一个等待中的线程。
2.1 应用
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_semaphore_t sem = dispatch_semaphore_create(1);
dispatch_queue_t queue1 = dispatch_queue_create("HotpotCat", NULL);
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"1 start");
NSLog(@"1 end");
dispatch_semaphore_signal(sem);
});
dispatch_async(queue1, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"2 start");
NSLog(@"2 end");
dispatch_semaphore_signal(sem);
});
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"3 start");
NSLog(@"3 end");
dispatch_semaphore_signal(sem);
});
dispatch_async(queue1, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"4 start");
NSLog(@"4 end");
dispatch_semaphore_signal(sem);
});
对于上面的例子输出:
1 start
1 end
2 start
2 end
3 start
3 end
4 start
4 end
1
,全局队列与自定义串行队列中的任务按顺序依次执行。当将信号量改为
2
后输出:1 start
2 start
2 end
1 end
3 start
4 start
3 end
4 end
这个时候1、2
先执行无序,3、4
后执行无序。这样就控制了GCD
任务的最大并发数。
修改代码如下:
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_semaphore_t sem = dispatch_semaphore_create(0);
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"1 start");
NSLog(@"1 end");
});
dispatch_async(queue, ^{
sleep(2);
NSLog(@"2 start");
NSLog(@"2 end");
dispatch_semaphore_signal(sem);
});
信号量初始值修改为0
,在任务1
中wait
,在任务2
中signal
,这个时候输出如下:
2 start
2 end
1 start
1 end
任务2
比任务1
先执行了。由于信号量初始化为0
,wait
函数后面任务就执行不了一直等待;等到signal
执行后发送信号wait
就可以执行了。这样就达到了控制流程。任务2
中的信号控制了任务1
的执行。
2.2 源码分析
2.2.1 dispatch_semaphore_create
/*
* @param dsema
* The semaphore. The result of passing NULL in this parameter is undefined.
*/
dispatch_semaphore_t
dispatch_semaphore_create(intptr_t value)
{
dispatch_semaphore_t dsema;
// If the internal value is negative, then the absolute of the value is
// equal to the number of waiting threads. Therefore it is bogus to
// initialize the semaphore with a negative value.
if (value < 0) { //>=0 才有用,否则直接返回
return DISPATCH_BAD_INPUT;// 0
}
dsema = _dispatch_object_alloc(DISPATCH_VTABLE(semaphore),
sizeof(struct dispatch_semaphore_s));
dsema->do_next = DISPATCH_OBJECT_LISTLESS;
dsema->do_targetq = _dispatch_get_default_queue(false);
dsema->dsema_value = value;
_dispatch_sema4_init(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
dsema->dsema_orig = value;
return dsema;
}
- 当
value < 0
的时候无效,只有>= 0
才有效,才会执行后续流程。
2.2.2 dispatch_semaphore_wait
intptr_t
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
//--
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (likely(value >= 0)) { //>=0 返回
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}
--
后value
大于等于0
直接返回0
。执行dispatch_semaphore_wait
后续的代码。- 否则执行
_dispatch_semaphore_wait_slow
(相当于do-while
循环)。
_dispatch_semaphore_wait_slow
当信号量为0
的时候调用wait
后(< 0
)就走_dispatch_semaphore_wait_slow
逻辑了:
DISPATCH_NOINLINE
static intptr_t
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
//超时直接break
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
//NOW的情况下进行超时处理
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1,
&orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
//FOREVER则进入wait逻辑。
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
return 0;
}
- 当值为
timeout
的时候直接break
。 - 当值为
DISPATCH_TIME_NOW
的时候循环调用_DSEMA4_TIMEOUT()
。
#define _DSEMA4_TIMEOUT() KERN_OPERATION_TIMED_OUT
- 当值为
DISPATCH_TIME_FOREVER
的时候调用_dispatch_sema4_wait
。
_dispatch_sema4_wait
// void
// _dispatch_sema4_wait(_dispatch_sema4_t *sema)
// {
// int ret = 0;
// do {
// ret = sem_wait(sema);
// } while (ret == -1 && errno == EINTR);
// DISPATCH_SEMAPHORE_VERIFY_RET(ret);
// }
void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
kern_return_t kr;
do {
kr = semaphore_wait(*sema);
} while (kr == KERN_ABORTED);
DISPATCH_SEMAPHORE_VERIFY_KR(kr);
}
semaphore_wait
并没有搜到实现,这是pthread
内核封装的实现。_dispatch_sema4_wait
本质上是一个do-while
循环,相当于在这里直接卡住执行不到后面的逻辑了。相当于:dispatch_async(queue, ^{
// dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
do {
//循环
} while (signal <= 0);
NSLog(@"1 start");
NSLog(@"1 end");
});
结论:当value >= 0
的时候执行后续的代码,否则do-while
循环卡住后续逻辑。
2.2.3 dispatch_semaphore_signal
/*!
* @function dispatch_semaphore_signal
*
* @abstract
* Signal (increment) a semaphore.
*
* @discussion
* Increment the counting semaphore. If the previous value was less than zero,
* this function wakes a waiting thread before returning.
*
* @param dsema The counting semaphore.
* The result of passing NULL in this parameter is undefined.
*
* @result
* This function returns non-zero if a thread is woken. Otherwise, zero is
* returned.
*/
intptr_t
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
//++操作
long value = os_atomic_inc2o(dsema, dsema_value, release);
if (likely(value > 0)) {
return 0;
}
//++ 后还 < 0,则表示做wait操作(--)过多。报错。
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
//发送信号量逻辑,恢复wait等待的操作。
return _dispatch_semaphore_signal_slow(dsema);
}
os_atomic_inc2o
执行++
后值大于0
直接返回能够执行。- 只有
<= 0
的时候才执行后续流程,调用_dispatch_semaphore_signal_slow
进行异常处理。 - 注释说明了当值
< 0
的时候在return
之前唤醒一个等待线程。
_dispatch_semaphore_signal_slow
intptr_t
_dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
{
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
_dispatch_sema4_signal(&dsema->dsema_sema, 1);
return 1;
}
直接调用_dispatch_sema4_signal
。
_dispatch_sema4_signal
#define DISPATCH_SEMAPHORE_VERIFY_KR(x) do { \
DISPATCH_VERIFY_MIG(x); \
if (unlikely((x) == KERN_INVALID_NAME)) { \
DISPATCH_CLIENT_CRASH((x), \
"Use-after-free of dispatch_semaphore_t or dispatch_group_t"); \
} else if (unlikely(x)) { \
DISPATCH_INTERNAL_CRASH((x), "mach semaphore API failure"); \
} \
} while (0)
//经过调试走的是这个逻辑
void
_dispatch_sema4_signal(_dispatch_sema4_t *sema, long count)
{
do {
kern_return_t kr = semaphore_signal(*sema);//+1
DISPATCH_SEMAPHORE_VERIFY_KR(kr);// == -1 报错
} while (--count);//do-while(0) 只执行一次
}
相当于内部做了+1
操作。这也是当信号量初始值为0
的时候dispatch_semaphore_signal
执行完毕后dispatch_semaphore_wait
能够执行的原因。
小结:
dispatch_semaphore_wait
进行--
操作,减完是负值进入do-while
循环,阻塞后续流程。dispatch_semaphore_signal
进行++
操作,加完值不大于0
进入后续报错流程。semaphore_signal
与semaphore_wait
才是信号量能控制最大并发数的根本原因,否则dispatch_semaphore_signal
与dispatch_semaphore_signal
都是判断后直接返回,相当于什么都没做。
三、调度组
最直接的作用: 控制任务执行顺序。
相关API
:
dispatch_group_create
创建组dispatch_group_async
进组任务 (与dispatch_group_enter
和dispatch_group_leave
搭配使用效果相同)dispatch_group_enter
进组dispatch_group_leave
出组
dispatch_group_notify
进组任务执行完毕通知dispatch_group_wait
进组任务执行等待时间
3.1 应用
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_queue_t queue1 = dispatch_queue_create("test", DISPATCH_QUEUE_CONCURRENT);
dispatch_group_async(group, queue, ^{
sleep(3);
NSLog(@"1");
});
dispatch_group_async(group, queue1, ^{
sleep(2);
NSLog(@"2");
});
dispatch_group_async(group, queue1, ^{
sleep(1);
NSLog(@"3");
});
dispatch_group_async(group, queue, ^{
NSLog(@"4");
});
dispatch_group_notify(group, dispatch_get_global_queue(0, 0), ^{
NSLog(@"5");
});
有如上案例,任务5
永远在任务1、2、3、4
之后执行。
当然也可以使用enter
与leave
配合dispatch_async
使用:
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_queue_t queue1 = dispatch_queue_create("test", DISPATCH_QUEUE_CONCURRENT);
//先 enter 再 leave
dispatch_group_enter(group);
dispatch_async(queue, ^{
sleep(3);
NSLog(@"1");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(queue1, ^{
sleep(2);
NSLog(@"2");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(queue1, ^{
sleep(1);
NSLog(@"3");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(queue, ^{
NSLog(@"4");
dispatch_group_leave(group);
});
dispatch_group_notify(group, dispatch_get_global_queue(0, 0), ^{
NSLog(@"5");
});
效果相同,需要注意的是dispatch_group_enter
要比dispatch_group_leave
先调用,并且必须成对出现,否则会崩溃。当然两种形式也可以混着用。
3.2 源码分析
根据上面的分析有3
个问题:
- 1.
dispatch_group_enter
为什么要比dispatch_group_leave
先调用,否则崩溃? - 2.能够实现同步的原理是什么?
- 3.
dispatch_group_async
为什么等价于dispatch_group_enter + dispatch_group_leave
?
之前的版本调度组是封装了信号量,目前新版本的是调度组自己写了一套逻辑。
3.2.1 dispatch_group_create
dispatch_group_t
dispatch_group_create(void)
{
return _dispatch_group_create_with_count(0);
}
//creat & enter 写在一起的写法,信号量标记位1
dispatch_group_t
_dispatch_group_create_and_enter(void)
{
return _dispatch_group_create_with_count(1);
}
是对_dispatch_group_create_with_count
的调用:
static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
sizeof(struct dispatch_group_s));
dg->do_next = DISPATCH_OBJECT_LISTLESS;
dg->do_targetq = _dispatch_get_default_queue(false);
if (n) {
os_atomic_store2o(dg, dg_bits,
(uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
}
return dg;
}
调用_dispatch_object_alloc
创建group
,与信号量写法相似
3.2.2 dispatch_group_enter
void
dispatch_group_enter(dispatch_group_t dg)
{
// The value is decremented on a 32bits wide atomic so that the carry
// for the 0 -> -1 transition is not propagated to the upper 32bits.
//0-- -> -1,与信号量不同的是没有wait
uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
DISPATCH_GROUP_VALUE_INTERVAL, acquire);
uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
if (unlikely(old_value == 0)) {
_dispatch_retain(dg); // <rdar://problem/22318411>
}
if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
DISPATCH_CLIENT_CRASH(old_bits,
"Too many nested calls to dispatch_group_enter()");
}
}
0--
变为-1
,与信号量不同的是没有wait
操作。
3.2.3 dispatch_group_leave
void
dispatch_group_leave(dispatch_group_t dg)
{
// The value is incremented on a 64bits wide atomic so that the carry for
// the -1 -> 0 transition increments the generation atomically.
//-1++ -> 0
uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
DISPATCH_GROUP_VALUE_INTERVAL, release);
//#define DISPATCH_GROUP_VALUE_MASK 0x00000000fffffffcULL
// old_state & DISPATCH_GROUP_VALUE_MASK 是一个很大的值
uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
//-1 & DISPATCH_GROUP_VALUE_MASK == DISPATCH_GROUP_VALUE_1,old_value = -1
if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {//old_value == -1
old_state += DISPATCH_GROUP_VALUE_INTERVAL;
do {
new_state = old_state;
if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
} else {
// If the group was entered again since the atomic_add above,
// we can't clear the waiters bit anymore as we don't know for
// which generation the waiters are for
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
}
if (old_state == new_state) break;
} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
old_state, new_state, &old_state, relaxed)));
//调用 _dispatch_group_wake,唤醒 dispatch_group_notify
return _dispatch_group_wake(dg, old_state, true);
}
//old_value 为0的情况下直接报错,也就是先leave的情况下直接报错
if (unlikely(old_value == 0)) {
DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
"Unbalanced call to dispatch_group_leave()");
}
}
-1++
变为0
,当old_value == -1
的时候调用_dispatch_group_wake
唤醒dispatch_group_notify
。- 既然
old_value == -1
的时候才唤醒,那么多次enter
只有最后一次leave
的时候才能唤醒。 - 当
old_value == 0
的时候直接报错,这也就是为什么先调用leave
直接发生了crash
。
3.2.4 dispatch_group_notify
void
dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dsn = _dispatch_continuation_alloc();
_dispatch_continuation_init(dsn, dq, db, 0, DC_FLAG_CONSUME);
_dispatch_group_notify(dg, dq, dsn);
}
调用_dispatch_group_notify
:
static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dsn)
{
uint64_t old_state, new_state;
dispatch_continuation_t prev;
dsn->dc_data = dq;
_dispatch_retain(dq);
prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
if (os_mpsc_push_was_empty(prev)) {
os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
if ((uint32_t)old_state == 0) {//循环判断 old_state == 0 的时候 wake
os_atomic_rmw_loop_give_up({
return _dispatch_group_wake(dg, new_state, false);
});
}
});
}
}
- 当
old_state == 0
的时候调用_dispatch_group_wake
,也就是调用block
的callout
。与leave
调用了同一个方法。
为什么两个地方都调用了?
因为在leave
的时候dispatch_group_notify
可能已经执行过了,任务已经保存在了group
中,leave
的时候本身尝试调用一次。
当然leave
中也可能是一个延时任务,当调用leave
的时候notify
可能还没有执行,就导致notify
任务还不存在。所以需要在notify
中也调用。
_dispatch_group_wake
static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
dispatch_continuation_t dc, next_dc, tail;
// Snapshot before anything is notified/woken <rdar://problem/8554546>
dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
do {
dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
//异步回调,执行block callout
_dispatch_continuation_async(dsn_queue, dc,
_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
_dispatch_release(dsn_queue);
} while ((dc = next_dc));
refs++;
}
if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
_dispatch_wake_by_address(&dg->dg_gen);
}
if (refs) _dispatch_release_n(dg, refs);
}
- 调用
_dispatch_continuation_async
相当于调用的是dispatch_async
执行notify
的任务。 - 任务先保存在在
group
中,唤醒notify
的时候才将任务加入队列。
3.2.5 dispatch_group_async
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
//标记 DC_FLAG_GROUP_ASYNC
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
_dispatch_continuation_group_async(dg, dq, dc, qos);
}
调用_dispatch_continuation_group_async
:
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dc, dispatch_qos_t qos)
{
//调用enter
dispatch_group_enter(dg);
dc->dc_data = dg;
//dispatch_async
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
- 内部先调用
dispatch_group_enter
,在这里就等待wakeup
的调用了 - 再调用
_dispatch_continuation_async
,相当于dispatch_async
。
那么leave
在什么时候调用呢?
肯定要在callout
执行完毕后调用。_dispatch_continuation_async
的调用以全局队列为例调用_dispatch_root_queue_push
,最终会调用到_dispatch_continuation_invoke_inline
:
group
的情况下(dispatch_group_async
的时候dc_flags
进行了标记)调用的是_dispatch_continuation_with_group_invoke
:static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
struct dispatch_object_s *dou = dc->dc_data;
unsigned long type = dx_type(dou);
if (type == DISPATCH_GROUP_TYPE) {
//callout
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_trace_item_complete(dc);
//leave
dispatch_group_leave((dispatch_group_t)dou);
} else {
DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
}
}
- 在
callout
后调用了dispatch_group_leave
。
dispatch_group_async 底层是对 dispatch_group_enter + dispatch_group_leave 的封装
- 在
dispatch_group_async
中先进行dispatch_group_enter
,然后执行dispatch_async
。 - 在回调中先
_dispatch_client_callout
然后dispatch_group_leave
。