注册

CocoaAsyncSocket源码分析---Connect (七)

接着我们来到流处理的第三步:addStreamsToRunLoop-添加到runloop上。

Stream相关方法三 -- 加到当前线程的runloop上:

//把stream添加到runloop上
- (BOOL)addStreamsToRunLoop
{
LogTrace();

NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
NSAssert((readStream != NULL && writeStream != NULL), @"Read/Write stream is null");

//判断flag里是否包含kAddedStreamsToRunLoop,没添加过则添加。
if (!(flags & kAddedStreamsToRunLoop))
{
LogVerbose(@"Adding streams to runloop...");

[[self class] startCFStreamThreadIfNeeded];
//在开启的线程中去执行,阻塞式的
[[self class] performSelector:@selector(scheduleCFStreams:)
onThread:cfstreamThread
withObject:self
waitUntilDone:YES];

//添加标识
flags |= kAddedStreamsToRunLoop;
}

return YES;
}


这里方法做了两件事:

  1. 开启了一条用于CFStream读写回调的常驻线程,其中调用了好几个函数:
 + (void)startCFStreamThreadIfNeeded;
+ (void)cfstreamThread;

在这两个函数中,添加了一个runloop,并且绑定了一个定时器事件,让它run起来,使得线程常驻。

  1. 在这个常驻线程中去调用注册方法:
//注册CFStream
+ (void)scheduleCFStreams:(GCDAsyncSocket *)asyncSocket
{
LogTrace();

//断言当前线程是cfstreamThread,不是则报错
NSAssert([NSThread currentThread] == cfstreamThread, @"Invoked on wrong thread");

//获取到runloop
CFRunLoopRef runLoop = CFRunLoopGetCurrent();
//如果有readStream
if (asyncSocket->readStream)
//注册readStream在runloop的kCFRunLoopDefaultMode上
CFReadStreamScheduleWithRunLoop(asyncSocket->readStream, runLoop, kCFRunLoopDefaultMode);

//一样
if (asyncSocket->writeStream)
CFWriteStreamScheduleWithRunLoop(asyncSocket->writeStream, runLoop, kCFRunLoopDefaultMode);
}

这里可以看到,我们流的回调都是在这条流的常驻线程中,至于为什么要这么做,相信大家楼主看过AFNetworking系列文章的会明白。我们之后文章也会就这个框架线程的问题详细讨论的,这里就暂时不详细说明了。
这里主要用了CFReadStreamScheduleWithRunLoop函数完成了runloop的注册:

CFReadStreamScheduleWithRunLoop(asyncSocket->readStream, runLoop, kCFRunLoopDefaultMode);
CFWriteStreamScheduleWithRunLoop(asyncSocket->writeStream, runLoop, kCFRunLoopDefaultMode);

这样,如果stream中有我们监听的事件发生了,就会在这个runloop中触发我们之前设置的读写回调函数。

我们完成了注册,接下来我们就需要打开stream了:

Stream相关方法四 -- 打开stream:

//打开stream
- (BOOL)openStreams
{
LogTrace();

NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
//断言读写stream都不会空
NSAssert((readStream != NULL && writeStream != NULL), @"Read/Write stream is null");

//返回stream的状态

CFStreamStatus readStatus = CFReadStreamGetStatus(readStream);
CFStreamStatus writeStatus = CFWriteStreamGetStatus(writeStream);

//如果有任意一个没有开启
if ((readStatus == kCFStreamStatusNotOpen) || (writeStatus == kCFStreamStatusNotOpen))
{
LogVerbose(@"Opening read and write stream...");

//开启
BOOL r1 = CFReadStreamOpen(readStream);
BOOL r2 = CFWriteStreamOpen(writeStream);

//有一个开启失败
if (!r1 || !r2)
{
LogError(@"Error in CFStreamOpen");
return NO;
}
}

return YES;
}

方法也很简单,通过CFReadStreamGetStatus函数,获取到当前stream的状态,判断没开启则调用CFReadStreamOpen函数去开启,如果开启失败,错误返回。

到这里stream初始化相关的工作就做完了,接着我们还是回到本文方法十一 -- 连接成功后的初始化中

其中第5条,我们谈到了设置socket的I/O模式为非阻塞,相信很多朋友对socket的I/O:同步、异步、阻塞、非阻塞。这四个概念有所混淆。
简单的来说,同步、异步是对于客户端而言的。比如我发起一个调用一个函数,我如果直接去调用,那么就是同步的,否则新开辟一个线程去做,那么对于当前线程而言就是异步的。
阻塞和非阻塞是对于服务端而言。当服务端被客户端调用后,我如果立刻返回调用的结果(无论数据是否处理完)那么就是非阻塞的,又或者等待数据拿到并且处理完(总之一系列逻辑)再返回,那么这种情况就是阻塞的。

好了,有了这个概念,我们接下来看看Linux下的5种I/O模型:
1)阻塞I/O(blocking I/O)
2)非阻塞I/O (nonblocking I/O)

  1. I/O复用(select 和poll) (I/O multiplexing)
    4)信号驱动I/O (signal driven I/O (SIGIO))
    5)异步I/O (asynchronous I/O (the POSIX aio_functions))

我们来简单谈谈这5种模型:
1)阻塞I/O:
简单举个例子,比如我们调用read()去读取消息,如果是在阻塞模式下,我们会一直等待,直到有消息到来为止。
很多小伙伴可能又要说了,这有什么不可以,我们新开辟一条线程,让它等着不就行了,看起来确实没什么不可以。
那是因为你仅仅是站在客户端的角度上来看。试想如果我们服务端也这么做,那岂不是有多少个socket连接,我们得开辟多少个线程去做阻塞IO?
2)非阻塞I/O
于是就有了非阻塞的概念,当我们去read()的时候,直接返回结果,这样在很大概率下,是并没有消息给我们读的。这时候函数就会错误返回-1,并将errno设置为 EWOULDBLOCK,意为IO并没有数据。
这时候就需要我们自己有一个机制,能知道什么时候有数据,在去调用read()。有一个很傻的方式就是不停的循环去调用这个函数,这样有数据来,我们第一时间就读到了。
3)I/O复用模式
I/O复用模式阻塞I/O的改进版,它在read之前,会先去调用select去遍历所有的socket,看哪一个有消息。当然这个过程是阻塞的,直到有消息返回为止。然后在去调用read,阻塞的方式去读取从系统内核中去读取这条消息到进程中来。
4)信号驱动I/O
信号驱动I/O是一个半异步的I/O模式,它首先会调用一个系统sginal相关的函数,把socket和信号绑定起来,然后不管有没有消息直接返回(这一步非阻塞)。这时候系统内核会去检查socket是否有可用数据。有的话则发送该信号给进程,然后进程在去调用read阻塞式的从系统内核读取数据到进程中来(这一步阻塞)。
5)可能聪明的你已经想到了更好的解决方式,这就对了,这就是我们第5种IO模式:异步I/O ,它和第4步一样,也是调用sginal相关函数,把socket和信号绑定起来,同时绑定起来的还有一块数据缓冲区buffer。然后无论有没有数据直接返回(非阻塞)。而系统内核会去检查是否有可用数据,一旦有可用数据,则触发信号,并且把数据填充到我们之前提供的数据缓冲区buffer中。这样我们进程被信号触发,并且直接能从buffer中读取到数据,整个过程没有任何阻塞。
很显然,我们CocoaAyncSocket框架用的就是第5种I/O模式。

接着我们继续看本文方法十一 -- 连接成功后的初始化中第6条,读写source的初始化方法:

本文方法十二 -- 初始化读写source:
//初始化读写source
- (void)setupReadAndWriteSourcesForNewlyConnectedSocket:(int)socketFD
{
//GCD source DISPATCH_SOURCE_TYPE_READ 会一直监视着 socketFD,直到有数据可读
readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, socketFD, 0, socketQueue);
//_dispatch_source_type_write :监视着 socketFD,直到写数据了
writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, socketFD, 0, socketQueue);

// Setup event handlers

__weak GCDAsyncSocket *weakSelf = self;

#pragma mark readSource的回调

//GCD事件句柄 读,当socket中有数据流出现,就会触发这个句柄,全自动,不需要手动触发
dispatch_source_set_event_handler(readSource, ^{ @autoreleasepool {
#pragma clang diagnostic push
#pragma clang diagnostic warning "-Wimplicit-retain-self"

__strong GCDAsyncSocket *strongSelf = weakSelf;
if (strongSelf == nil) return_from_block;

LogVerbose(@"readEventBlock");
//从readSource中,获取到数据长度,
strongSelf->socketFDBytesAvailable = dispatch_source_get_data(strongSelf->readSource);
LogVerbose(@"socketFDBytesAvailable: %lu", strongSelf->socketFDBytesAvailable);

//如果长度大于0,开始读数据
if (strongSelf->socketFDBytesAvailable > 0)
[strongSelf doReadData];
else
//因为触发了,但是却没有可读数据,说明读到当前包边界了。做边界处理
[strongSelf doReadEOF];

#pragma clang diagnostic pop
}});

//写事件句柄
dispatch_source_set_event_handler(writeSource, ^{ @autoreleasepool {
#pragma clang diagnostic push
#pragma clang diagnostic warning "-Wimplicit-retain-self"

__strong GCDAsyncSocket *strongSelf = weakSelf;
if (strongSelf == nil) return_from_block;

LogVerbose(@"writeEventBlock");
//标记为接受数据
strongSelf->flags |= kSocketCanAcceptBytes;
//开始写
[strongSelf doWriteData];

#pragma clang diagnostic pop
}});

// Setup cancel handlers

__block int socketFDRefCount = 2;

#if !OS_OBJECT_USE_OBJC
dispatch_source_t theReadSource = readSource;
dispatch_source_t theWriteSource = writeSource;
#endif

//读写取消的句柄
dispatch_source_set_cancel_handler(readSource, ^{
#pragma clang diagnostic push
#pragma clang diagnostic warning "-Wimplicit-retain-self"

LogVerbose(@"readCancelBlock");

#if !OS_OBJECT_USE_OBJC
LogVerbose(@"dispatch_release(readSource)");
dispatch_release(theReadSource);
#endif

if (--socketFDRefCount == 0)
{
LogVerbose(@"close(socketFD)");
//关闭socket
close(socketFD);
}

#pragma clang diagnostic pop
});

dispatch_source_set_cancel_handler(writeSource, ^{
#pragma clang diagnostic push
#pragma clang diagnostic warning "-Wimplicit-retain-self"

LogVerbose(@"writeCancelBlock");

#if !OS_OBJECT_USE_OBJC
LogVerbose(@"dispatch_release(writeSource)");
dispatch_release(theWriteSource);
#endif

if (--socketFDRefCount == 0)
{
LogVerbose(@"close(socketFD)");
//关闭socket
close(socketFD);
}

#pragma clang diagnostic pop
});

// We will not be able to read until data arrives.
// But we should be able to write immediately.

//设置未读数量为0
socketFDBytesAvailable = 0;
//把读挂起的状态移除
flags &= ~kReadSourceSuspended;

LogVerbose(@"dispatch_resume(readSource)");
//开启读source
dispatch_resume(readSource);

//标记为当前可接受数据
flags |= kSocketCanAcceptBytes;
//先把写source标记为挂起
flags |= kWriteSourceSuspended;
}


这个方法初始化了读写source,这个方法主要是GCD source运用

这里GCD Source相关的主要是下面这3个函数

//创建source
dispatch_source_create(dispatch_source_type_t type,
uintptr_t handle,
unsigned long mask,
dispatch_queue_t _Nullable queue);
//为source设置事件句柄
dispatch_source_set_event_handler(dispatch_source_t source,
dispatch_block_t _Nullable handler);
//为source设置取消句柄
dispatch_source_set_cancel_handler(dispatch_source_t source,
dispatch_block_t _Nullable handler);


相信大家用至少用过GCD定时器,接触过这3个函数,这里创建source的函数,根据参数type的不同,可以处理不同的事件:

ebeef7d7d62e07b298d76ec269e66026.png

这里我们用的是DISPATCH_SOURCE_TYPE_READDISPATCH_SOURCE_TYPE_WRITE这两个类型。标识如果handle如果有可读或者可写数据时,会触发我们的事件句柄。

  • 而这里初始化的读写事件句柄内容也很简单,就是去读写数据。
  • 而取消句柄也就是去关闭socket
  • 初始化完成后,我们开启了readSource,一旦有数据过来就触发了我们readSource事件句柄,就可以去监听的socket所分配的缓冲区中去读取数据了,而wirteSource初始化完是挂起的。
  • 除此之外我们还初始化了当前source的状态,用于我们后续的操作。

至此我们客户端的整个Connect流程结束了 ,下章概括总结一下Connect

CocoaAsyncSocket源码分析---Connect (二)

CocoaAsyncSocket源码分析---Connect (三)

CocoaAsyncSocket源码分析---Connect (四)

CocoaAsyncSocket源码分析---Connect (五)

CocoaAsyncSocket源码分析---Connect (六)

CocoaAsyncSocket源码分析---Connect (七)

CocoaAsyncSocket源码分析---Connect (八)


作者:Cooci
链接:https://www.jianshu.com/p/b264eff1f326




0 个评论

要回复文章请先登录注册