CocoaAsyncSocket源码分析---Connect (六)
我们接着来看看连接成功后,初始化的方法:
本文方法十一 -- 连接成功后的初始化
//连接成功后调用,设置一些连接成功的状态
- (void)didConnect:(int)aStateIndex
{
LogTrace();
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
//状态不同
if (aStateIndex != stateIndex)
{
LogInfo(@"Ignoring didConnect, already disconnected");
// The connect operation has been cancelled.
// That is, socket was disconnected, or connection has already timed out.
return;
}
//kConnected合并到当前flag中
flags |= kConnected;
//停止连接超时
[self endConnectTimeout];
#if TARGET_OS_IPHONE
// The endConnectTimeout method executed above incremented the stateIndex.
//上面的endConnectTimeout,会导致stateIndex增加,所以需要重新赋值
aStateIndex = stateIndex;
#endif
// Setup read/write streams (as workaround for specific shortcomings in the iOS platform)
//
// Note:
// There may be configuration options that must be set by the delegate before opening the streams.
//打开stream之前必须用相关配置设置代理
// The primary example is the kCFStreamNetworkServiceTypeVoIP flag, which only works on an unopened stream.
//主要的例子是kCFStreamNetworkServiceTypeVoIP标记,只能工作在未打开的stream中?
//
// Thus we wait until after the socket:didConnectToHost:port: delegate method has completed.
//所以我们要等待,连接完成的代理调用完
// This gives the delegate time to properly configure the streams if needed.
//这些给了代理时间,去正确的配置Stream,如果是必要的话
//创建个Block来初始化Stream
dispatch_block_t SetupStreamsPart1 = ^{
NSLog(@"hello~");
#if TARGET_OS_IPHONE
//创建读写stream失败,则关闭并报对应错误
if (![self createReadAndWriteStream])
{
[self closeWithError:[self otherError:@"Error creating CFStreams"]];
return;
}
//参数是给NO的,就是有可读bytes的时候,不会调用回调函数
if (![self registerForStreamCallbacksIncludingReadWrite:NO])
{
[self closeWithError:[self otherError:@"Error in CFStreamSetClient"]];
return;
}
#endif
};
//part2设置stream
dispatch_block_t SetupStreamsPart2 = ^{
#if TARGET_OS_IPHONE
//状态不一样直接返回
if (aStateIndex != stateIndex)
{
// The socket has been disconnected.
return;
}
//如果加到runloop上失败
if (![self addStreamsToRunLoop])
{
//错误返回
[self closeWithError:[self otherError:@"Error in CFStreamScheduleWithRunLoop"]];
return;
}
//读写stream open
if (![self openStreams])
{
//开启错误返回
[self closeWithError:[self otherError:@"Error creating CFStreams"]];
return;
}
#endif
};
// Notify delegate
//通知代理
//拿到server端的host port
NSString *host = [self connectedHost];
uint16_t port = [self connectedPort];
//拿到unix域的 url
NSURL *url = [self connectedUrl];
//拿到代理
__strong id theDelegate = delegate;
//代理队列 和 Host不为nil 且响应didConnectToHost代理方法
if (delegateQueue && host != nil && [theDelegate respondsToSelector:@selector(socket:didConnectToHost:port:)])
{
//调用初始化stream1
SetupStreamsPart1();
dispatch_async(delegateQueue, ^{ @autoreleasepool {
//到代理队列调用连接成功的代理方法
[theDelegate socket:self didConnectToHost:host port:port];
//然后回到socketQueue中去执行初始化stream2
dispatch_async(socketQueue, ^{ @autoreleasepool {
SetupStreamsPart2();
}});
}});
}
//这个是unix domain 请求回调
else if (delegateQueue && url != nil && [theDelegate respondsToSelector:@selector(socket:didConnectToUrl:)])
{
SetupStreamsPart1();
dispatch_async(delegateQueue, ^{ @autoreleasepool {
[theDelegate socket:self didConnectToUrl:url];
dispatch_async(socketQueue, ^{ @autoreleasepool {
SetupStreamsPart2();
}});
}});
}
//否则只初始化stream
else
{
SetupStreamsPart1();
SetupStreamsPart2();
}
// Get the connected socket
int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;
//fcntl,功能描述:根据文件描述词来操作文件的特性。http://blog.csdn.net/pbymw8iwm/article/details/7974789
// Enable non-blocking IO on the socket
//使socket支持非阻塞IO
int result = fcntl(socketFD, F_SETFL, O_NONBLOCK);
if (result == -1)
{
//失败 ,报错
NSString *errMsg = @"Error enabling non-blocking IO on socket (fcntl)";
[self closeWithError:[self otherError:errMsg]];
return;
}
// Setup our read/write sources
//初始化读写source
[self setupReadAndWriteSourcesForNewlyConnectedSocket:socketFD];
// Dequeue any pending read/write requests
//开始下一个任务
[self maybeDequeueRead];
[self maybeDequeueWrite];
}
这个方法很长一大串,其实做的东西也很简单,主要做了下面几件事:
- 把当前状态flags加上已连接,并且关闭掉我们一开始连接开启的,连接超时的定时器。
- 初始化了两个
Block
:SetupStreamsPart1
、SetupStreamsPart2
,这两个Block
做的事都和读写流有关。SetupStreamsPart1
用来创建读写流,并且注册回调。另一个SetupStreamsPart2
用来把流添加到当前线程的runloop
上,并且打开流。 - 判断是否有代理
queue
、host
或者url
这些参数是否为空、是否代理响应didConnectToHost
或didConnectToUrl
代理,这两种分别对应了普通socket
连接和unix domin socket
连接。如果实现了对应的代理,则调用连接成功的代理。 - 在调用代理的同时,调用了我们之前初始化的两个读写流相关的
Block
。这里值得说下的是这两个Block和代理之间的调用顺序:
先执行
SetupStreamsPart1
后执行SetupStreamsPart2
,没什么好说的,问题是代理的执行时间,想想如果我们放在SetupStreamsPart2
后面是不是会导致个问题,就是用户收到消息了,但是连接成功的代理还没有被调用,这显然是不合理的。所以我们的调用顺序是SetupStreamsPart1
->代理->SetupStreamsPart2
所以出现了如下代码:
作者:Cooci
链接:https://www.jianshu.com/p/b264eff1f326
//调用初始化stream1
SetupStreamsPart1();
dispatch_async(delegateQueue, ^{ @autoreleasepool {
//到代理队列调用连接成功的代理方法
[theDelegate socket:self didConnectToHost:host port:port];
//然后回到socketQueue中去执行初始化stream2
dispatch_async(socketQueue, ^{ @autoreleasepool {
SetupStreamsPart2();
}});
}});
原因是为了线程安全和socket相关的操作必须在socketQueue
中进行。而代理必须在我们设置的代理queue
中被回调。
- 拿到当前的本机socket,调用如下函数:
int result = fcntl(socketFD, F_SETFL, O_NONBLOCK);
而在这里,就是为了把socket
的IO模式设置为非阻塞。很多小伙伴又要疑惑什么是非阻塞了,先别急,关于这个我们下文会详细的来谈。
我们初始化了读写source(很重要,所有的消息都是由这个source来触发的,我们之后会详细分析这个方法)。
我们做完了
stream
和source
的初始化处理,则开始做一次读写任务(这两个方法暂时不讲,会放到之后的Read和Write篇中去讲)。
我们接着来讲讲这个方法中对其他方法的调用,按照顺序来,先从第2条,两个Block中对stream
的处理开始。和stream相关的函数一共有6个:
Stream相关方法一 -- 创建读写stream
//创建读写stream
- (BOOL)createReadAndWriteStream
{
LogTrace();
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
//如果有一个有值,就返回
if (readStream || writeStream)
{
// Streams already created
return YES;
}
//拿到socket,首选是socket4FD,其次socket6FD,都没有才是socketUN,socketUN应该是Unix的socket结构体
int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;
//如果都为空,返回NO
if (socketFD == SOCKET_NULL)
{
// Cannot create streams without a file descriptor
return NO;
}
//如果非连接,返回NO
if (![self isConnected])
{
// Cannot create streams until file descriptor is connected
return NO;
}
LogVerbose(@"Creating read and write stream...");
#pragma mark - 绑定Socket和CFStream
//下面的接口用于创建一对 socket stream,一个用于读取,一个用于写入:
CFStreamCreatePairWithSocket(NULL, (CFSocketNativeHandle)socketFD, &readStream, &writeStream);
// The kCFStreamPropertyShouldCloseNativeSocket property should be false by default (for our case).
// But let's not take any chances.
//读写stream都设置成不会随着绑定的socket一起close,release。 kCFBooleanFalse不一起,kCFBooleanTrue一起
if (readStream)
CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
if (writeStream)
CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
//如果有一个为空
if ((readStream == NULL) || (writeStream == NULL))
{
LogWarn(@"Unable to create read and write stream...");
//关闭对应的stream
if (readStream)
{
CFReadStreamClose(readStream);
CFRelease(readStream);
readStream = NULL;
}
if (writeStream)
{
CFWriteStreamClose(writeStream);
CFRelease(writeStream);
writeStream = NULL;
}
//返回创建失败
return NO;
}
//创建成功
return YES;
}
这个方法基本上很简单,就是关于两个stream函数的调用:
- 创建stream的函数:
CFStreamCreatePairWithSocket(NULL, (CFSocketNativeHandle)socketFD, &readStream, &writeStream);
这个函数创建了一对读写stream,并且把stream与这个scoket做了绑定。
- 设置stream属性:
CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
这个函数可以给stream
设置一个属性,这里是设置stream
不会随着socket
的生命周期(close
,release
)而变化。
接着调用了registerForStreamCallbacksIncludingReadWrite
来给stream
注册读写回调。
Stream相关方法二 -- 读写回调的注册:
//注册Stream的回调
- (BOOL)registerForStreamCallbacksIncludingReadWrite:(BOOL)includeReadWrite
{
LogVerbose(@"%@ %@", THIS_METHOD, (includeReadWrite ? @"YES" : @"NO"));
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
//判断读写stream是不是都为空
NSAssert((readStream != NULL && writeStream != NULL), @"Read/Write stream is null");
//客户端stream上下文对象
streamContext.version = 0;
streamContext.info = (__bridge void *)(self);
streamContext.retain = nil;
streamContext.release = nil;
streamContext.copyDescription = nil;
// The open has completed successfully.
// The stream has bytes to be read.
// The stream can accept bytes for writing.
// An error has occurred on the stream.
// The end of the stream has been reached.
//设置一个CF的flag 两种,一种是错误发生的时候,一种是stream事件结束
CFOptionFlags readStreamEvents = kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered ;
//如果包含读写
if (includeReadWrite)
//仍然有Bytes要读的时候 The stream has bytes to be read.
readStreamEvents |= kCFStreamEventHasBytesAvailable;
//给读stream设置客户端,会在之前设置的那些标记下回调函数 CFReadStreamCallback。设置失败的话直接返回NO
if (!CFReadStreamSetClient(readStream, readStreamEvents, &CFReadStreamCallback, &streamContext))
{
return NO;
}
//写的flag,也一样
CFOptionFlags writeStreamEvents = kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered;
if (includeReadWrite)
writeStreamEvents |= kCFStreamEventCanAcceptBytes;
if (!CFWriteStreamSetClient(writeStream, writeStreamEvents, &CFWriteStreamCallback, &streamContext))
{
return NO;
}
//走到最后说明读写都设置回调成功,返回YES
return YES;
}
相信用过
CFStream
的朋友,应该会觉得很简单,这个方法就是调用了一些CFStream
相关函数,其中最主要的这个设置读写回调函数:Boolean CFReadStreamSetClient(CFReadStreamRef stream, CFOptionFlags streamEvents, CFReadStreamClientCallBack clientCB, CFStreamClientContext *clientContext);
Boolean CFWriteStreamSetClient(CFWriteStreamRef stream, CFOptionFlags streamEvents, CFWriteStreamClientCallBack clientCB, CFStreamClientContext *clientContext);
这个函数共4个参数:
第1个为我们需要设置的stream;
第2个为需要监听的事件选项,包括以下事件:
typedef CF_OPTIONS(CFOptionFlags, CFStreamEventType) {
kCFStreamEventNone = 0, //没有事件发生
kCFStreamEventOpenCompleted = 1, //成功打开流
kCFStreamEventHasBytesAvailable = 2, //流中有数据可读
kCFStreamEventCanAcceptBytes = 4, //流中可以接受数据去写
kCFStreamEventErrorOccurred = 8, //流发生错误
kCFStreamEventEndEncountered = 16 //到达流的结尾
};
kCFStreamEventErrorOccurred
和kCFStreamEventEndEncountered
,再根据传过来的参数去决定是否监听kCFStreamEventCanAcceptBytes
://如果包含读写
if (includeReadWrite)
//仍然有Bytes要读的时候 The stream has bytes to be read.
readStreamEvents |= kCFStreamEventHasBytesAvailable;
而这里我们传过来的参数为NO,导致它并不监听可读数据。显然,我们正常的连接,当有消息发送过来,并不是由stream
回调来触发的。这个框架中,如果是TLS
传输的socket
是用stream
来触发的,这个我们后续文章会讲到。
那么有数据的时候,到底是什么来触发我们的读写呢,答案就是读写source
,我们接下来就会去创建初始化它。
这里绑定了两个函数,分别对应读和写的回调,分别为:
//读的回调
static void CFReadStreamCallback (CFReadStreamRef stream, CFStreamEventType type, void *pInfo)
//写的回调
static void CFWriteStreamCallback (CFWriteStreamRef stream, CFStreamEventType type, void *pInfo)
关于这两个函数,同样这里暂时不做讨论,等后续文章再来分析。
还有一点需要说一下的是streamContext
这个属性,它是一个结构体,包含流的上下文信息,其结构如下:
typedef struct {
CFIndex version;
void *info;
void *(*retain)(void *info);
void (*release)(void *info);
CFStringRef (*copyDescription)(void *info);
} CFStreamClientContext;
这个流的上下文中info
指针,其实就是前面所对应的读写回调函数中的pInfo
指针,每次回调都会传过去。其它的version
就是流的版本标识,之外的3个都需要的是一个函数指针,对应我们传递的pInfo
的持有以及释放还有复制的描述信息,这里我们都赋值给nil
。
下一章我们来到流处理的第三步