注册

CocoaAsyncSocket源码分析---Connect (六)

我们接着来看看连接成功后,初始化的方法:

本文方法十一 -- 连接成功后的初始化

//连接成功后调用,设置一些连接成功的状态
- (void)didConnect:(int)aStateIndex
{
LogTrace();

NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");

//状态不同
if (aStateIndex != stateIndex)
{
LogInfo(@"Ignoring didConnect, already disconnected");

// The connect operation has been cancelled.
// That is, socket was disconnected, or connection has already timed out.
return;
}

//kConnected合并到当前flag中
flags |= kConnected;
//停止连接超时
[self endConnectTimeout];

#if TARGET_OS_IPHONE
// The endConnectTimeout method executed above incremented the stateIndex.
//上面的endConnectTimeout,会导致stateIndex增加,所以需要重新赋值
aStateIndex = stateIndex;
#endif

// Setup read/write streams (as workaround for specific shortcomings in the iOS platform)
//
// Note:
// There may be configuration options that must be set by the delegate before opening the streams.
//打开stream之前必须用相关配置设置代理
// The primary example is the kCFStreamNetworkServiceTypeVoIP flag, which only works on an unopened stream.
//主要的例子是kCFStreamNetworkServiceTypeVoIP标记,只能工作在未打开的stream中?
//
// Thus we wait until after the socket:didConnectToHost:port: delegate method has completed.
//所以我们要等待,连接完成的代理调用完
// This gives the delegate time to properly configure the streams if needed.
//这些给了代理时间,去正确的配置Stream,如果是必要的话

//创建个Block来初始化Stream
dispatch_block_t SetupStreamsPart1 = ^{

NSLog(@"hello~");
#if TARGET_OS_IPHONE
//创建读写stream失败,则关闭并报对应错误
if (![self createReadAndWriteStream])
{
[self closeWithError:[self otherError:@"Error creating CFStreams"]];
return;
}

//参数是给NO的,就是有可读bytes的时候,不会调用回调函数
if (![self registerForStreamCallbacksIncludingReadWrite:NO])
{
[self closeWithError:[self otherError:@"Error in CFStreamSetClient"]];
return;
}

#endif
};
//part2设置stream
dispatch_block_t SetupStreamsPart2 = ^{
#if TARGET_OS_IPHONE
//状态不一样直接返回
if (aStateIndex != stateIndex)
{
// The socket has been disconnected.
return;
}
//如果加到runloop上失败
if (![self addStreamsToRunLoop])
{
//错误返回
[self closeWithError:[self otherError:@"Error in CFStreamScheduleWithRunLoop"]];
return;
}

//读写stream open
if (![self openStreams])
{
//开启错误返回
[self closeWithError:[self otherError:@"Error creating CFStreams"]];
return;
}

#endif
};

// Notify delegate
//通知代理
//拿到server端的host port
NSString *host = [self connectedHost];
uint16_t port = [self connectedPort];
//拿到unix域的 url
NSURL *url = [self connectedUrl];
//拿到代理
__strong id theDelegate = delegate;

//代理队列 和 Host不为nil 且响应didConnectToHost代理方法
if (delegateQueue && host != nil && [theDelegate respondsToSelector:@selector(socket:didConnectToHost:port:)])
{
//调用初始化stream1
SetupStreamsPart1();

dispatch_async(delegateQueue, ^{ @autoreleasepool {

//到代理队列调用连接成功的代理方法
[theDelegate socket:self didConnectToHost:host port:port];

//然后回到socketQueue中去执行初始化stream2
dispatch_async(socketQueue, ^{ @autoreleasepool {

SetupStreamsPart2();
}});
}});
}
//这个是unix domain 请求回调
else if (delegateQueue && url != nil && [theDelegate respondsToSelector:@selector(socket:didConnectToUrl:)])
{
SetupStreamsPart1();

dispatch_async(delegateQueue, ^{ @autoreleasepool {

[theDelegate socket:self didConnectToUrl:url];

dispatch_async(socketQueue, ^{ @autoreleasepool {

SetupStreamsPart2();
}});
}});
}
//否则只初始化stream
else
{
SetupStreamsPart1();
SetupStreamsPart2();
}

// Get the connected socket

int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;

//fcntl,功能描述:根据文件描述词来操作文件的特性。http://blog.csdn.net/pbymw8iwm/article/details/7974789
// Enable non-blocking IO on the socket
//使socket支持非阻塞IO
int result = fcntl(socketFD, F_SETFL, O_NONBLOCK);
if (result == -1)
{
//失败 ,报错
NSString *errMsg = @"Error enabling non-blocking IO on socket (fcntl)";
[self closeWithError:[self otherError:errMsg]];

return;
}

// Setup our read/write sources
//初始化读写source
[self setupReadAndWriteSourcesForNewlyConnectedSocket:socketFD];

// Dequeue any pending read/write requests
//开始下一个任务
[self maybeDequeueRead];
[self maybeDequeueWrite];
}

这个方法很长一大串,其实做的东西也很简单,主要做了下面几件事:

  1. 把当前状态flags加上已连接,并且关闭掉我们一开始连接开启的,连接超时的定时器。
  2. 初始化了两个BlockSetupStreamsPart1SetupStreamsPart2,这两个Block做的事都和读写流有关。SetupStreamsPart1用来创建读写流,并且注册回调。另一个SetupStreamsPart2用来把流添加到当前线程的runloop上,并且打开流。
  3. 判断是否有代理queuehost或者url这些参数是否为空、是否代理响应didConnectToHostdidConnectToUrl代理,这两种分别对应了普通socket连接和unix domin socket连接。如果实现了对应的代理,则调用连接成功的代理。
  4. 在调用代理的同时,调用了我们之前初始化的两个读写流相关的Block。这里值得说下的是这两个Block和代理之间的调用顺序:
  • 先执行SetupStreamsPart1后执行SetupStreamsPart2,没什么好说的,问题是代理的执行时间,想想如果我们放在SetupStreamsPart2后面是不是会导致个问题,就是用户收到消息了,但是连接成功的代理还没有被调用,这显然是不合理的。所以我们的调用顺序是SetupStreamsPart1->代理->SetupStreamsPart2

    所以出现了如下代码:


作者:Cooci
链接:https://www.jianshu.com/p/b264eff1f326
  //调用初始化stream1
SetupStreamsPart1();

dispatch_async(delegateQueue, ^{ @autoreleasepool {

//到代理队列调用连接成功的代理方法
[theDelegate socket:self didConnectToHost:host port:port];

//然后回到socketQueue中去执行初始化stream2
dispatch_async(socketQueue, ^{ @autoreleasepool {

SetupStreamsPart2();
}});
}});


原因是为了线程安全和socket相关的操作必须在socketQueue中进行。而代理必须在我们设置的代理queue中被回调。

  1. 拿到当前的本机socket,调用如下函数:
int result = fcntl(socketFD, F_SETFL, O_NONBLOCK);


而在这里,就是为了把socket的IO模式设置为非阻塞。很多小伙伴又要疑惑什么是非阻塞了,先别急,关于这个我们下文会详细的来谈。

  1. 我们初始化了读写source(很重要,所有的消息都是由这个source来触发的,我们之后会详细分析这个方法)。

  2. 我们做完了streamsource的初始化处理,则开始做一次读写任务(这两个方法暂时不讲,会放到之后的ReadWrite篇中去讲)。

我们接着来讲讲这个方法中对其他方法的调用,按照顺序来,先从第2条,两个Block中对stream的处理开始。和stream相关的函数一共有6个:

Stream相关方法一 -- 创建读写stream

//创建读写stream
- (BOOL)createReadAndWriteStream
{
LogTrace();

NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");

//如果有一个有值,就返回
if (readStream || writeStream)
{
// Streams already created
return YES;
}
//拿到socket,首选是socket4FD,其次socket6FD,都没有才是socketUN,socketUN应该是Unix的socket结构体
int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;

//如果都为空,返回NO
if (socketFD == SOCKET_NULL)
{
// Cannot create streams without a file descriptor
return NO;
}

//如果非连接,返回NO
if (![self isConnected])
{
// Cannot create streams until file descriptor is connected
return NO;
}

LogVerbose(@"Creating read and write stream...");

#pragma mark - 绑定Socket和CFStream
//下面的接口用于创建一对 socket stream,一个用于读取,一个用于写入:
CFStreamCreatePairWithSocket(NULL, (CFSocketNativeHandle)socketFD, &readStream, &writeStream);

// The kCFStreamPropertyShouldCloseNativeSocket property should be false by default (for our case).
// But let's not take any chances.

//读写stream都设置成不会随着绑定的socket一起close,release。 kCFBooleanFalse不一起,kCFBooleanTrue一起
if (readStream)
CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
if (writeStream)
CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);

//如果有一个为空
if ((readStream == NULL) || (writeStream == NULL))
{
LogWarn(@"Unable to create read and write stream...");

//关闭对应的stream
if (readStream)
{
CFReadStreamClose(readStream);
CFRelease(readStream);
readStream = NULL;
}
if (writeStream)
{
CFWriteStreamClose(writeStream);
CFRelease(writeStream);
writeStream = NULL;
}
//返回创建失败
return NO;
}
//创建成功
return YES;
}

这个方法基本上很简单,就是关于两个stream函数的调用:

  1. 创建stream的函数:
CFStreamCreatePairWithSocket(NULL, (CFSocketNativeHandle)socketFD, &readStream, &writeStream);

这个函数创建了一对读写stream,并且把stream与这个scoket做了绑定。

  1. 设置stream属性:
CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);

这个函数可以给stream设置一个属性,这里是设置stream不会随着socket的生命周期(close,release)而变化。

接着调用了registerForStreamCallbacksIncludingReadWrite来给stream注册读写回调。

Stream相关方法二 -- 读写回调的注册:

//注册Stream的回调
- (BOOL)registerForStreamCallbacksIncludingReadWrite:(BOOL)includeReadWrite
{
LogVerbose(@"%@ %@", THIS_METHOD, (includeReadWrite ? @"YES" : @"NO"));

NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
//判断读写stream是不是都为空
NSAssert((readStream != NULL && writeStream != NULL), @"Read/Write stream is null");

//客户端stream上下文对象
streamContext.version = 0;
streamContext.info = (__bridge void *)(self);
streamContext.retain = nil;
streamContext.release = nil;
streamContext.copyDescription = nil;

// The open has completed successfully.
// The stream has bytes to be read.
// The stream can accept bytes for writing.
// An error has occurred on the stream.
// The end of the stream has been reached.

//设置一个CF的flag 两种,一种是错误发生的时候,一种是stream事件结束
CFOptionFlags readStreamEvents = kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered ;
//如果包含读写
if (includeReadWrite)
//仍然有Bytes要读的时候 The stream has bytes to be read.
readStreamEvents |= kCFStreamEventHasBytesAvailable;

//给读stream设置客户端,会在之前设置的那些标记下回调函数 CFReadStreamCallback。设置失败的话直接返回NO
if (!CFReadStreamSetClient(readStream, readStreamEvents, &CFReadStreamCallback, &streamContext))
{
return NO;
}

//写的flag,也一样
CFOptionFlags writeStreamEvents = kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered;
if (includeReadWrite)
writeStreamEvents |= kCFStreamEventCanAcceptBytes;

if (!CFWriteStreamSetClient(writeStream, writeStreamEvents, &CFWriteStreamCallback, &streamContext))
{
return NO;
}
//走到最后说明读写都设置回调成功,返回YES
return YES;
}

相信用过CFStream的朋友,应该会觉得很简单,这个方法就是调用了一些CFStream相关函数,其中最主要的这个设置读写回调函数:
Boolean CFReadStreamSetClient(CFReadStreamRef stream, CFOptionFlags streamEvents, CFReadStreamClientCallBack clientCB, CFStreamClientContext *clientContext);
Boolean CFWriteStreamSetClient(CFWriteStreamRef stream, CFOptionFlags streamEvents, CFWriteStreamClientCallBack clientCB, CFStreamClientContext *clientContext);


这个函数共4个参数:
第1个为我们需要设置的stream;
第2个为需要监听的事件选项,包括以下事件:

typedef CF_OPTIONS(CFOptionFlags, CFStreamEventType) {
kCFStreamEventNone = 0, //没有事件发生
kCFStreamEventOpenCompleted = 1, //成功打开流
kCFStreamEventHasBytesAvailable = 2, //流中有数据可读
kCFStreamEventCanAcceptBytes = 4, //流中可以接受数据去写
kCFStreamEventErrorOccurred = 8, //流发生错误
kCFStreamEventEndEncountered = 16 //到达流的结尾
};

其中具体用法,大家可以自行去试试,这里作者只监听了了两种事件kCFStreamEventErrorOccurredkCFStreamEventEndEncountered,再根据传过来的参数去决定是否监听kCFStreamEventCanAcceptBytes

//如果包含读写
if (includeReadWrite)
//仍然有Bytes要读的时候 The stream has bytes to be read.
readStreamEvents |= kCFStreamEventHasBytesAvailable;

而这里我们传过来的参数为NO,导致它并不监听可读数据。显然,我们正常的连接,当有消息发送过来,并不是由stream回调来触发的。这个框架中,如果是TLS传输的socket是用stream来触发的,这个我们后续文章会讲到。

那么有数据的时候,到底是什么来触发我们的读写呢,答案就是读写source,我们接下来就会去创建初始化它。

这里绑定了两个函数,分别对应读和写的回调,分别为:

//读的回调
static void CFReadStreamCallback (CFReadStreamRef stream, CFStreamEventType type, void *pInfo)
//写的回调
static void CFWriteStreamCallback (CFWriteStreamRef stream, CFStreamEventType type, void *pInfo)

关于这两个函数,同样这里暂时不做讨论,等后续文章再来分析。

还有一点需要说一下的是streamContext这个属性,它是一个结构体,包含流的上下文信息,其结构如下:

typedef struct {
CFIndex version;
void *info;
void *(*retain)(void *info);
void (*release)(void *info);
CFStringRef (*copyDescription)(void *info);
} CFStreamClientContext;


这个流的上下文中info指针,其实就是前面所对应的读写回调函数中的pInfo指针,每次回调都会传过去。其它的version就是流的版本标识,之外的3个都需要的是一个函数指针,对应我们传递的pInfo的持有以及释放还有复制的描述信息,这里我们都赋值给nil

下一章我们来到流处理的第三步


CocoaAsyncSocket源码分析---Connect (一)
CocoaAsyncSocket源码分析---Connect (八)
作者:Cooci
链接:https://www.jianshu.com/p/b264eff1f326







0 个评论

要回复文章请先登录注册