CocoaAsyncSocket源码分析---Connect (七)
接着我们来到流处理的第三步:addStreamsToRunLoop
-添加到runloop上。
Stream相关方法三 -- 加到当前线程的runloop上:
//把stream添加到runloop上
- (BOOL)addStreamsToRunLoop
{
LogTrace();
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
NSAssert((readStream != NULL && writeStream != NULL), @"Read/Write stream is null");
//判断flag里是否包含kAddedStreamsToRunLoop,没添加过则添加。
if (!(flags & kAddedStreamsToRunLoop))
{
LogVerbose(@"Adding streams to runloop...");
[[self class] startCFStreamThreadIfNeeded];
//在开启的线程中去执行,阻塞式的
[[self class] performSelector:@selector(scheduleCFStreams:)
onThread:cfstreamThread
withObject:self
waitUntilDone:YES];
//添加标识
flags |= kAddedStreamsToRunLoop;
}
return YES;
}
这里方法做了两件事:
- 开启了一条用于
CFStream
读写回调的常驻线程,其中调用了好几个函数:
+ (void)startCFStreamThreadIfNeeded;
+ (void)cfstreamThread;
在这两个函数中,添加了一个runloop
,并且绑定了一个定时器事件,让它run
起来,使得线程常驻。
- 在这个常驻线程中去调用注册方法:
//注册CFStream
+ (void)scheduleCFStreams:(GCDAsyncSocket *)asyncSocket
{
LogTrace();
//断言当前线程是cfstreamThread,不是则报错
NSAssert([NSThread currentThread] == cfstreamThread, @"Invoked on wrong thread");
//获取到runloop
CFRunLoopRef runLoop = CFRunLoopGetCurrent();
//如果有readStream
if (asyncSocket->readStream)
//注册readStream在runloop的kCFRunLoopDefaultMode上
CFReadStreamScheduleWithRunLoop(asyncSocket->readStream, runLoop, kCFRunLoopDefaultMode);
//一样
if (asyncSocket->writeStream)
CFWriteStreamScheduleWithRunLoop(asyncSocket->writeStream, runLoop, kCFRunLoopDefaultMode);
}
AFNetworking
系列文章的会明白。我们之后文章也会就这个框架线程的问题详细讨论的,这里就暂时不详细说明了。这里主要用了
CFReadStreamScheduleWithRunLoop
函数完成了runloop
的注册:CFReadStreamScheduleWithRunLoop(asyncSocket->readStream, runLoop, kCFRunLoopDefaultMode);
CFWriteStreamScheduleWithRunLoop(asyncSocket->writeStream, runLoop, kCFRunLoopDefaultMode);
这样,如果stream
中有我们监听的事件发生了,就会在这个runloop
中触发我们之前设置的读写回调函数。
我们完成了注册,接下来我们就需要打开stream
了:
Stream相关方法四 -- 打开stream:
//打开stream
- (BOOL)openStreams
{
LogTrace();
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
//断言读写stream都不会空
NSAssert((readStream != NULL && writeStream != NULL), @"Read/Write stream is null");
//返回stream的状态
CFStreamStatus readStatus = CFReadStreamGetStatus(readStream);
CFStreamStatus writeStatus = CFWriteStreamGetStatus(writeStream);
//如果有任意一个没有开启
if ((readStatus == kCFStreamStatusNotOpen) || (writeStatus == kCFStreamStatusNotOpen))
{
LogVerbose(@"Opening read and write stream...");
//开启
BOOL r1 = CFReadStreamOpen(readStream);
BOOL r2 = CFWriteStreamOpen(writeStream);
//有一个开启失败
if (!r1 || !r2)
{
LogError(@"Error in CFStreamOpen");
return NO;
}
}
return YES;
}
方法也很简单,通过CFReadStreamGetStatus
函数,获取到当前stream
的状态,判断没开启则调用CFReadStreamOpen
函数去开启,如果开启失败,错误返回。
到这里stream初始化相关的工作就做完了,接着我们还是回到本文方法十一 -- 连接成功后的初始化中:
其中第5条,我们谈到了设置socket
的I/O模式为非阻塞,相信很多朋友对socket
的I/O:同步、异步、阻塞、非阻塞。这四个概念有所混淆。
简单的来说,同步、异步是对于客户端而言的。比如我发起一个调用一个函数,我如果直接去调用,那么就是同步的,否则新开辟一个线程去做,那么对于当前线程而言就是异步的。
而阻塞和非阻塞是对于服务端而言。当服务端被客户端调用后,我如果立刻返回调用的结果(无论数据是否处理完)那么就是非阻塞的,又或者等待数据拿到并且处理完(总之一系列逻辑)再返回,那么这种情况就是阻塞的。
好了,有了这个概念,我们接下来看看Linux
下的5种I/O
模型:
1)阻塞I/O(blocking I/O)
2)非阻塞I/O (nonblocking I/O)
- I/O复用(select 和poll) (I/O multiplexing)
4)信号驱动I/O (signal driven I/O (SIGIO))
5)异步I/O (asynchronous I/O (the POSIX aio_functions))
我们来简单谈谈这5种模型:
1)阻塞I/O:
简单举个例子,比如我们调用read()
去读取消息,如果是在阻塞模式下,我们会一直等待,直到有消息到来为止。
很多小伙伴可能又要说了,这有什么不可以,我们新开辟一条线程,让它等着不就行了,看起来确实没什么不可以。
那是因为你仅仅是站在客户端的角度上来看。试想如果我们服务端也这么做,那岂不是有多少个socket连接,我们得开辟多少个线程去做阻塞IO?
2)非阻塞I/O
于是就有了非阻塞的概念,当我们去read()
的时候,直接返回结果,这样在很大概率下,是并没有消息给我们读的。这时候函数就会错误返回-1,并将errno
设置为 EWOULDBLOCK
,意为IO
并没有数据。
这时候就需要我们自己有一个机制,能知道什么时候有数据,在去调用read()。有一个很傻的方式就是不停的循环去调用这个函数,这样有数据来,我们第一时间就读到了。
3)I/O复用模式I/O复用模式
是阻塞I/O
的改进版,它在read
之前,会先去调用select
去遍历所有的socket
,看哪一个有消息。当然这个过程是阻塞的,直到有消息返回为止。然后在去调用read
,阻塞的方式去读取从系统内核中去读取这条消息到进程中来。
4)信号驱动I/O信号驱动I/O
是一个半异步的I/O模式,它首先会调用一个系统sginal
相关的函数,把socket
和信号绑定起来,然后不管有没有消息直接返回(这一步非阻塞)。这时候系统内核会去检查socket
是否有可用数据。有的话则发送该信号给进程,然后进程在去调用read
阻塞式的从系统内核读取数据到进程中来(这一步阻塞)。
5)可能聪明的你已经想到了更好的解决方式,这就对了,这就是我们第5种IO模式:异步I/O
,它和第4步一样,也是调用sginal
相关函数,把socket
和信号绑定起来,同时绑定起来的还有一块数据缓冲区buffer
。然后无论有没有数据直接返回(非阻塞)。而系统内核会去检查是否有可用数据,一旦有可用数据,则触发信号,并且把数据填充到我们之前提供的数据缓冲区buffer中。这样我们进程被信号触发,并且直接能从buffer中读取到数据,整个过程没有任何阻塞。
很显然,我们CocoaAyncSocket
框架用的就是第5种I/O模式。
接着我们继续看本文方法十一 -- 连接成功后的初始化中第6条,读写source的初始化方法:
本文方法十二 -- 初始化读写source:
//初始化读写source
- (void)setupReadAndWriteSourcesForNewlyConnectedSocket:(int)socketFD
{
//GCD source DISPATCH_SOURCE_TYPE_READ 会一直监视着 socketFD,直到有数据可读
readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, socketFD, 0, socketQueue);
//_dispatch_source_type_write :监视着 socketFD,直到写数据了
writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, socketFD, 0, socketQueue);
// Setup event handlers
__weak GCDAsyncSocket *weakSelf = self;
#pragma mark readSource的回调
//GCD事件句柄 读,当socket中有数据流出现,就会触发这个句柄,全自动,不需要手动触发
dispatch_source_set_event_handler(readSource, ^{ @autoreleasepool {
#pragma clang diagnostic push
#pragma clang diagnostic warning "-Wimplicit-retain-self"
__strong GCDAsyncSocket *strongSelf = weakSelf;
if (strongSelf == nil) return_from_block;
LogVerbose(@"readEventBlock");
//从readSource中,获取到数据长度,
strongSelf->socketFDBytesAvailable = dispatch_source_get_data(strongSelf->readSource);
LogVerbose(@"socketFDBytesAvailable: %lu", strongSelf->socketFDBytesAvailable);
//如果长度大于0,开始读数据
if (strongSelf->socketFDBytesAvailable > 0)
[strongSelf doReadData];
else
//因为触发了,但是却没有可读数据,说明读到当前包边界了。做边界处理
[strongSelf doReadEOF];
#pragma clang diagnostic pop
}});
//写事件句柄
dispatch_source_set_event_handler(writeSource, ^{ @autoreleasepool {
#pragma clang diagnostic push
#pragma clang diagnostic warning "-Wimplicit-retain-self"
__strong GCDAsyncSocket *strongSelf = weakSelf;
if (strongSelf == nil) return_from_block;
LogVerbose(@"writeEventBlock");
//标记为接受数据
strongSelf->flags |= kSocketCanAcceptBytes;
//开始写
[strongSelf doWriteData];
#pragma clang diagnostic pop
}});
// Setup cancel handlers
__block int socketFDRefCount = 2;
#if !OS_OBJECT_USE_OBJC
dispatch_source_t theReadSource = readSource;
dispatch_source_t theWriteSource = writeSource;
#endif
//读写取消的句柄
dispatch_source_set_cancel_handler(readSource, ^{
#pragma clang diagnostic push
#pragma clang diagnostic warning "-Wimplicit-retain-self"
LogVerbose(@"readCancelBlock");
#if !OS_OBJECT_USE_OBJC
LogVerbose(@"dispatch_release(readSource)");
dispatch_release(theReadSource);
#endif
if (--socketFDRefCount == 0)
{
LogVerbose(@"close(socketFD)");
//关闭socket
close(socketFD);
}
#pragma clang diagnostic pop
});
dispatch_source_set_cancel_handler(writeSource, ^{
#pragma clang diagnostic push
#pragma clang diagnostic warning "-Wimplicit-retain-self"
LogVerbose(@"writeCancelBlock");
#if !OS_OBJECT_USE_OBJC
LogVerbose(@"dispatch_release(writeSource)");
dispatch_release(theWriteSource);
#endif
if (--socketFDRefCount == 0)
{
LogVerbose(@"close(socketFD)");
//关闭socket
close(socketFD);
}
#pragma clang diagnostic pop
});
// We will not be able to read until data arrives.
// But we should be able to write immediately.
//设置未读数量为0
socketFDBytesAvailable = 0;
//把读挂起的状态移除
flags &= ~kReadSourceSuspended;
LogVerbose(@"dispatch_resume(readSource)");
//开启读source
dispatch_resume(readSource);
//标记为当前可接受数据
flags |= kSocketCanAcceptBytes;
//先把写source标记为挂起
flags |= kWriteSourceSuspended;
}
这个方法初始化了读写source
,这个方法主要是GCD source
运用
这里GCD Source
相关的主要是下面这3个函数
//创建source
dispatch_source_create(dispatch_source_type_t type,
uintptr_t handle,
unsigned long mask,
dispatch_queue_t _Nullable queue);
//为source设置事件句柄
dispatch_source_set_event_handler(dispatch_source_t source,
dispatch_block_t _Nullable handler);
//为source设置取消句柄
dispatch_source_set_cancel_handler(dispatch_source_t source,
dispatch_block_t _Nullable handler);
相信大家用至少用过GCD
定时器,接触过这3个函数,这里创建source的函数,根据参数type的不同,可以处理不同的事件:
这里我们用的是DISPATCH_SOURCE_TYPE_READ
和DISPATCH_SOURCE_TYPE_WRITE
这两个类型。标识如果handle
如果有可读或者可写数据时,会触发我们的事件句柄。
- 而这里初始化的读写事件句柄内容也很简单,就是去读写数据。
- 而取消句柄也就是去关闭
socket
。 - 初始化完成后,我们开启了
readSource
,一旦有数据过来就触发了我们readSource
事件句柄,就可以去监听的socket
所分配的缓冲区中去读取数据了,而wirteSource
初始化完是挂起的。 - 除此之外我们还初始化了当前
source
的状态,用于我们后续的操作。
Connect
流程结束了 ,下章概括总结一下ConnectCocoaAsyncSocket源码分析---Connect (二)
CocoaAsyncSocket源码分析---Connect (三)
CocoaAsyncSocket源码分析---Connect (四)
CocoaAsyncSocket源码分析---Connect (五)
CocoaAsyncSocket源码分析---Connect (六)
CocoaAsyncSocket源码分析---Connect (七)
CocoaAsyncSocket源码分析---Connect (八)
作者:Cooci
链接:https://www.jianshu.com/p/b264eff1f326