高级线程应用之栅栏、信号量、调度组以及source(四)
四、Dispatch Source
在任一线程上调用它的的一个函数 dispatch_source_merge_data
后,会执行 dispatch source
事先定义好的句柄(可以把句柄简单理解为一个 block
) 这个过程叫 用户事件(Custom event
)。是 dispatch source
支持处理的一种事件。
句柄是一种指向指针的指针,它指向的就是一个类或者结构,它和系统有很密切的关系。比如:实例句柄(HINSTANCE
),位图句柄(HBITMAP
),设备表述句柄(HDC
),图标句柄(HICON
)等。这当中还有一个通用的句柄,就是HANDLE
。
Dispatch Source
有两点:
CPU
负荷非常小,尽量不占用资源 。- 联结的优势。
dispatch source
不受runloop
的影响,底层封装的是pthread
。
相关API
:
dispatch_source_create
创建源dispatch_source_set_event_handler
设置源事件回调dispatch_source_merge_data
源事件设置数据dispatch_source_get_data
获取源事件数据dispatch_resume
继续dispatch_suspend
挂起
4.1 应用
dispatch_source_t
dispatch_source_create(dispatch_source_type_t type,
uintptr_t handle,
uintptr_t mask,
dispatch_queue_t _Nullable queue);
type
:dispatch
源可处理的事件。比如:DISPATCH_SOURCE_TYPE_TIMER
、DISPATCH_SOURCE_TYPE_DATA_ADD
。DISPATCH_SOURCE_TYPE_DATA_ADD
: 将所有触发结果相加,最后统一执行响应。间隔的时间越长,则每次触发都会响应;如果间隔的时间很短,则会将触发后的结果相加后统一触发。也就是利用CPU
空闲时间进行回调。
handle
:可以理解为句柄、索引或id
,如果要监听进程,需要传入进程的ID
。mask
:可以理解为描述,具体要监听什么。queue
:处理handle
的队列。
有如下一个进度条的案例:
self.completed = 0;
self.queue = dispatch_queue_create("HotpotCat", NULL);
self.source = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_ADD, 0, 0, dispatch_get_main_queue());
//设置句柄
dispatch_source_set_event_handler(self.source, ^{
NSLog(@"%@",[NSThread currentThread]);
NSUInteger value = dispatch_source_get_data(self.source);
self.completed += value;
double progress = self.completed / 100.0;
NSLog(@"progress: %.2f",progress);
self.progressView.progress = progress;
});
self.isRunning = YES;
//创建后默认是挂起状态
dispatch_resume(self.source);
创建了一个ADD
类型的source
,在handle
获取进度增量并更新进度条。由于创建后source
处于挂起状态,需要先恢复。
可以在按钮的点击事件中进行任务的挂起和恢复:
if (self.isRunning) {
dispatch_suspend(self.source);
dispatch_suspend(self.queue);
NSLog(@"pause");
self.isRunning = NO;
[sender setTitle:@"pause" forState:UIControlStateNormal];
} else {
dispatch_resume(self.source);
dispatch_resume(self.queue);
NSLog(@"running");
self.isRunning = YES;
[sender setTitle:@"running" forState:UIControlStateNormal];
}
任务的执行是一个简单的循环:
for (NSInteger i = 0; i < 100; i++) {
dispatch_async(self.queue, ^{
NSLog(@"merge");
//加不加 sleep 影响 handler 的执行次数。
sleep(1);
dispatch_source_merge_data(self.source, 1);//+1
});
}
- 在循环中调用
dispatch_source_merge_data
触发回调。当queue
挂起后后续任务就不再执行了。 - 在不加
sleep
的情况下handler
的回调是小于100
次的,任务会被合并。
4.2 源码解析
4.2.1 dispatch_source_create
dispatch_source_t
dispatch_source_create(dispatch_source_type_t dst, uintptr_t handle,
uintptr_t mask, dispatch_queue_t dq)
{
dispatch_source_refs_t dr;
dispatch_source_t ds;
//add对应 _dispatch_source_data_create timer对应 _dispatch_source_timer_create
dr = dux_create(dst, handle, mask)._dr;
if (unlikely(!dr)) {
return DISPATCH_BAD_INPUT;
}
//创建队列
ds = _dispatch_queue_alloc(source,
dux_type(dr)->dst_strict ? DSF_STRICT : DQF_MUTABLE, 1,
DISPATCH_QUEUE_INACTIVE | DISPATCH_QUEUE_ROLE_INNER)._ds;
ds->dq_label = "source";
ds->ds_refs = dr;
dr->du_owner_wref = _dispatch_ptr2wref(ds);
//没有传队列,获取root_queues
if (unlikely(!dq)) {
dq = _dispatch_get_default_queue(true);
} else {
_dispatch_retain((dispatch_queue_t _Nonnull)dq);
}
//目标队列为传进来的dq
ds->do_targetq = dq;
//是timer 并且设置了interval则调用dispatch_source_set_timer进行设置
//也就是说type为timer的时候即使不设置timer也会默认设置。这里时间间隔设置为了handle
if (dr->du_is_timer && (dr->du_timer_flags & DISPATCH_TIMER_INTERVAL)) {
dispatch_source_set_timer(ds, DISPATCH_TIME_NOW, handle, UINT64_MAX);
}
_dispatch_object_debug(ds, "%s", __func__);
//返回自己创建的source,source本身也是队列。
return ds;
}
- 根据
type
创建对应的队列。add
对应_dispatch_source_data_create
,timer
对应_dispatch_source_timer_create
。 - 如果创建的时候没有传处理
handle
的队列,会默认获取root_queues
中的队列。 - 设置目标队列为传进来的队列。
- 如果
type
为DISPATCH_SOURCE_TYPE_INTERVAL
(应该是私有的)则主动调用一次dispatch_source_set_timer
。 - 返回自己创建的
source
,source
本身也是队列。
_dispatch_source_data_create
static dispatch_unote_t
_dispatch_source_data_create(dispatch_source_type_t dst, uintptr_t handle,
uintptr_t mask)
{
if (handle || mask) {
return DISPATCH_UNOTE_NULL;
}
// bypass _dispatch_unote_create() because this is always "direct"
// even when EV_UDATA_SPECIFIC is 0
dispatch_unote_class_t du = _dispatch_calloc(1u, dst->dst_size);
du->du_type = dst;
du->du_filter = dst->dst_filter;
du->du_is_direct = true;
return (dispatch_unote_t){ ._du = du };
}
直接调用_dispatch_calloc
创建返回。
_dispatch_source_timer_create
static dispatch_unote_t
_dispatch_source_timer_create(dispatch_source_type_t dst,
uintptr_t handle, uintptr_t mask)
{
dispatch_timer_source_refs_t dt;
......
//创建
dt = _dispatch_calloc(1u, dst->dst_size);
dt->du_type = dst;
dt->du_filter = dst->dst_filter;
dt->du_is_timer = true;
dt->du_timer_flags |= (uint8_t)(mask | dst->dst_timer_flags);
dt->du_ident = _dispatch_timer_unote_idx(dt);
dt->dt_timer.target = UINT64_MAX;
dt->dt_timer.deadline = UINT64_MAX;
dt->dt_timer.interval = UINT64_MAX;
dt->dt_heap_entry[DTH_TARGET_ID] = DTH_INVALID_ID;
dt->dt_heap_entry[DTH_DEADLINE_ID] = DTH_INVALID_ID;
return (dispatch_unote_t){ ._dt = dt };
}
内部时间给的默认值是最大值。
4.2.2 dispatch_source_set_event_handler
void
dispatch_source_set_event_handler(dispatch_source_t ds,
dispatch_block_t handler)
{
_dispatch_source_set_handler(ds, handler, DS_EVENT_HANDLER, true);
}
调用_dispatch_source_set_handler
传递的类型为DS_EVENT_HANDLER
。
DISPATCH_NOINLINE
static void
_dispatch_source_set_handler(dispatch_source_t ds, void *func,
uintptr_t kind, bool is_block)
{
dispatch_continuation_t dc;
//创建dc存储handler
dc = _dispatch_source_handler_alloc(ds, func, kind, is_block);
//挂起
if (_dispatch_lane_try_inactive_suspend(ds)) {
//替换
_dispatch_source_handler_replace(ds, kind, dc);
//恢复
return _dispatch_lane_resume(ds, DISPATCH_RESUME);
}
......
}
_dispatch_source_handler_alloc
存储handler
,内部会进行标记非DS_EVENT_HANDLER
会标记为DC_FLAG_CONSUME
。_dispatch_lane_try_inactive_suspend
挂起队列。_dispatch_source_handler_replace
替换handler
。static inline void
_dispatch_source_handler_replace(dispatch_source_t ds, uintptr_t kind,
dispatch_continuation_t dc)
{
//handler目标回调为空释放handler
if (!dc->dc_func) {
_dispatch_continuation_free(dc);
dc = NULL;
} else if (dc->dc_flags & DC_FLAG_FETCH_CONTEXT) {
dc->dc_ctxt = ds->do_ctxt;
}
//保存
dc = os_atomic_xchg(&ds->ds_refs->ds_handler[kind], dc, release);
if (dc) _dispatch_source_handler_dispose(dc);
}
_dispatch_lane_resume
恢复队列,调用队列对应的awake
。- 先调用
_dispatch_lane_resume_activate
(这也就是set
后立马调用的原因):
static void
_dispatch_lane_resume_activate(dispatch_lane_t dq)
{
if (dx_vtable(dq)->dq_activate) {
dx_vtable(dq)->dq_activate(dq);
}
_dispatch_lane_resume(dq, DISPATCH_ACTIVATION_DONE);
}
再调用_dispatch_lane_resume
。
4.2.3 dispatch_source_merge_data
void
dispatch_source_merge_data(dispatch_source_t ds, uintptr_t val)
{
dispatch_queue_flags_t dqf = _dispatch_queue_atomic_flags(ds);
dispatch_source_refs_t dr = ds->ds_refs;
if (unlikely(dqf & (DSF_CANCELED | DQF_RELEASED))) {
return;
}
//根据类型存值
switch (dr->du_filter) {
case DISPATCH_EVFILT_CUSTOM_ADD:
//有累加
os_atomic_add2o(dr, ds_pending_data, val, relaxed);
break;
case DISPATCH_EVFILT_CUSTOM_OR:
os_atomic_or2o(dr, ds_pending_data, val, relaxed);
break;
case DISPATCH_EVFILT_CUSTOM_REPLACE:
os_atomic_store2o(dr, ds_pending_data, val, relaxed);
break;
default:
DISPATCH_CLIENT_CRASH(dr->du_filter, "Invalid source type");
}
//唤醒执行回调
dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
}
- 根据类型对值进行处理,处理完之后唤醒队列执行。
对于主线程会执行_dispatch_main_queue_wakeup
,其中会取到dispatch_queue
获取到dc
,最后进行handler
的调用。
4.2.4 dispatch_source_get_data
uintptr_t
dispatch_source_get_data(dispatch_source_t ds)
{
dispatch_source_refs_t dr = ds->ds_refs;
#if DISPATCH_USE_MEMORYSTATUS
if (dr->du_vmpressure_override) {
return NOTE_VM_PRESSURE;
}
#if TARGET_OS_SIMULATOR
if (dr->du_memorypressure_override) {
return NOTE_MEMORYSTATUS_PRESSURE_WARN;
}
#endif
#endif // DISPATCH_USE_MEMORYSTATUS
//获取数据
uint64_t value = os_atomic_load2o(dr, ds_data, relaxed);
return (unsigned long)(dr->du_has_extended_status ?
DISPATCH_SOURCE_GET_DATA(value) : value);
}
与merge_data
相反,一个存一个取。
4.2.5 dispatch_resume
void
dispatch_resume(dispatch_object_t dou)
{
DISPATCH_OBJECT_TFB(_dispatch_objc_resume, dou);
if (unlikely(_dispatch_object_is_global(dou) ||
_dispatch_object_is_root_or_base_queue(dou))) {
return;
}
if (dx_cluster(dou._do) == _DISPATCH_QUEUE_CLUSTER) {
_dispatch_lane_resume(dou._dl, DISPATCH_RESUME);
}
}
经过调试走的是_dispatch_lane_resume
逻辑,与_dispatch_source_set_handler
中调用的一致。awake
队列。
4.2.6 dispatch_suspend
void
dispatch_suspend(dispatch_object_t dou)
{
DISPATCH_OBJECT_TFB(_dispatch_objc_suspend, dou);
if (unlikely(_dispatch_object_is_global(dou) ||
_dispatch_object_is_root_or_base_queue(dou))) {
return;
}
if (dx_cluster(dou._do) == _DISPATCH_QUEUE_CLUSTER) {
return _dispatch_lane_suspend(dou._dl);
}
}
调用_dispatch_lane_suspend
挂起队列。
4.2.7 dispatch_source_cancel
dispatch_source_cancel(dispatch_source_t ds)
{
_dispatch_object_debug(ds, "%s", __func__);
_dispatch_retain_2(ds);
if (_dispatch_queue_atomic_flags_set_orig(ds, DSF_CANCELED) & DSF_CANCELED){
_dispatch_release_2_tailcall(ds);
} else {
//_dispatch_workloop_wakeup
dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY | DISPATCH_WAKEUP_CONSUME_2);
}
}
_dispatch_workloop_wakeup
:cancel
内部会对状态进行判断,如果是挂起状态会报错。所以需要在运行状态下取消。- 调用
_dispatch_release_2_tailcall
进行释放操作。
4.2.8 dispatch_source_set_timer
void
dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
uint64_t interval, uint64_t leeway)
{
dispatch_timer_source_refs_t dt = ds->ds_timer_refs;
dispatch_timer_config_t dtc;
if (unlikely(!dt->du_is_timer)) {
DISPATCH_CLIENT_CRASH(ds, "Attempt to set timer on a non-timer source");
}
//根据type配置timer和interval
if (dt->du_timer_flags & DISPATCH_TIMER_INTERVAL) {
dtc = _dispatch_interval_config_create(start, interval, leeway, dt);
} else {
dtc = _dispatch_timer_config_create(start, interval, leeway, dt);
}
if (_dispatch_timer_flags_to_clock(dt->du_timer_flags) != dtc->dtc_clock &&
dt->du_filter == DISPATCH_EVFILT_TIMER_WITH_CLOCK) {
DISPATCH_CLIENT_CRASH(0, "Attempting to modify timer clock");
}
//跟踪配置
_dispatch_source_timer_telemetry(ds, dtc->dtc_clock, &dtc->dtc_timer);
dtc = os_atomic_xchg2o(dt, dt_pending_config, dtc, release);
if (dtc) free(dtc);
//唤醒
dx_wakeup(ds, 0, DISPATCH_WAKEUP_MAKE_DIRTY);
}
4.2.9 dispatch_source_set_registration_handler
void
dispatch_source_set_registration_handler(dispatch_source_t ds,
dispatch_block_t handler)
{
_dispatch_source_set_handler(ds, handler, DS_REGISTN_HANDLER, true);
}
也是直接调用的_dispatch_source_set_handler
,参数是DS_REGISTN_HANDLER
。
4.2.10 dispatch_source_set_cancel_handler
void
dispatch_source_set_cancel_handler(dispatch_source_t ds,
dispatch_block_t handler)
{
_dispatch_source_set_handler(ds, handler, DS_CANCEL_HANDLER, true);
}
- 直接调用的
_dispatch_source_set_handler
,参数是DS_CANCEL_HANDLER
。 - 会根据
DS_REGISTN_HANDLER、DS_CANCEL_HANDLER、DS_EVENT_HANDLER
进行handler
的获取和释放,因为这三者可能同时存在。
那么就有个问题设置timer
类型后我们没有主动调用dispatch_source_merge_data
,那么它是在什么时机调用的呢?在回调中bt
:
frame #2: 0x000000010b6a29c8 libdispatch.dylib`_dispatch_client_callout + 8
frame #3: 0x000000010b6a5316 libdispatch.dylib`_dispatch_continuation_pop + 557
frame #4: 0x000000010b6b8e8b libdispatch.dylib`_dispatch_source_invoke + 2205
frame #5: 0x000000010b6b4508 libdispatch.dylib`_dispatch_root_queue_drain + 351
frame #6: 0x000000010b6b4e6d libdispatch.dylib`_dispatch_worker_thread2 + 135
frame #7: 0x00007fff611639f7 libsystem_pthread.dylib`_pthread_wqthread + 220
frame #8: 0x00007fff61162b77 libsystem_pthread.dylib`start_wqthread + 15
搜索_dispatch_source_invoke
只找到了:
DISPATCH_VTABLE_INSTANCE(source,
.do_type = DISPATCH_SOURCE_KEVENT_TYPE,
.do_dispose = _dispatch_source_dispose,
.do_debug = _dispatch_source_debug,
.do_invoke = _dispatch_source_invoke,
.dq_activate = _dispatch_source_activate,
.dq_wakeup = _dispatch_source_wakeup,
.dq_push = _dispatch_lane_push,
);
source
的do_invoke
,调用逻辑为_dispatch_root_queue_drain -> _dispatch_continuation_pop_inline -> dx_invoke
:void
_dispatch_source_invoke(dispatch_source_t ds, dispatch_invoke_context_t dic,
dispatch_invoke_flags_t flags)
{
_dispatch_queue_class_invoke(ds, dic, flags,
DISPATCH_INVOKE_DISALLOW_SYNC_WAITERS, _dispatch_source_invoke2);
#if DISPATCH_EVENT_BACKEND_KEVENT
if (flags & DISPATCH_INVOKE_WORKLOOP_DRAIN) {
dispatch_workloop_t dwl = (dispatch_workloop_t)_dispatch_get_wlh();
dispatch_timer_heap_t dth = dwl->dwl_timer_heap;
if (dth && dth[0].dth_dirty_bits) {
//调用
_dispatch_event_loop_drain_timers(dwl->dwl_timer_heap,
DISPATCH_TIMER_WLH_COUNT);
}
}
#endif // DISPATCH_EVENT_BACKEND_KEVENT
}