CocoaAsyncSocket源码Read(六)
讲了半天理论,想必大家看的有点不耐烦了,接下来看看代码实际是如何处理的吧:
step1:从prebuffer中读取数据:
//先从提前缓冲区去读,如果缓冲区可读大小大于0
if ([preBuffer availableBytes] > 0)
{
// There are 3 types of read packets:
//
// 1) Read all available data.
// 2) Read a specific length of data.
// 3) Read up to a particular terminator.
//3种类型的读法,1、全读、2、读取特定长度、3、读取到一个明确的界限
NSUInteger bytesToCopy;
//如果当前读的数据界限不为空
if (currentRead->term != nil)
{
// Read type #3 - read up to a terminator
//直接读到界限
bytesToCopy = [currentRead readLengthForTermWithPreBuffer:preBuffer found:&done];
}
else
{
// Read type #1 or #2
//读取数据,读到指定长度或者数据包的长度为止
bytesToCopy = [currentRead readLengthForNonTermWithHint:[preBuffer availableBytes]];
}
// Make sure we have enough room in the buffer for our read.
//从上两步拿到我们需要读的长度,去看看有没有空间去存储
[currentRead ensureCapacityForAdditionalDataOfLength:bytesToCopy];
// Copy bytes from prebuffer into packet buffer
//拿到我们需要追加数据的指针位置
#pragma mark - 不明白
//当前读的数据 + 开始偏移 + 已经读完的??
uint8_t *buffer = (uint8_t *)[currentRead->buffer mutableBytes] + currentRead->startOffset +
currentRead->bytesDone;
//从prebuffer处复制过来数据,bytesToCopy长度
memcpy(buffer, [preBuffer readBuffer], bytesToCopy);
// Remove the copied bytes from the preBuffer
//从preBuffer移除掉已经复制的数据
[preBuffer didRead:bytesToCopy];
LogVerbose(@"copied(%lu) preBufferLength(%zu)", (unsigned long)bytesToCopy, [preBuffer availableBytes]);
// Update totals
//已读的数据加上
currentRead->bytesDone += bytesToCopy;
//当前已读的数据加上
totalBytesReadForCurrentRead += bytesToCopy;
// Check to see if the read operation is done
//判断是不是读完了
if (currentRead->readLength > 0)
{
// Read type #2 - read a specific length of data
//如果已读 == 需要读的长度,说明已经读完
done = (currentRead->bytesDone == currentRead->readLength);
}
//判断界限标记
else if (currentRead->term != nil)
{
// Read type #3 - read up to a terminator
// Our 'done' variable was updated via the readLengthForTermWithPreBuffer:found: method
//如果没做完,且读的最大长度大于0,去判断是否溢出
if (!done && currentRead->maxLength > 0)
{
// We're not done and there's a set maxLength.
// Have we reached that maxLength yet?
//如果已读的大小大于最大的大小,则报溢出错误
if (currentRead->bytesDone >= currentRead->maxLength)
{
error = [self readMaxedOutError];
}
}
}
else
{
// Read type #1 - read all available data
//
// We're done as soon as
// - we've read all available data (in prebuffer and socket)
// - we've read the maxLength of read packet.
//判断已读大小和最大大小是否相同,相同则读完
done = ((currentRead->maxLength > 0) && (currentRead->bytesDone == currentRead->maxLength));
}
}
这个方法就是利用我们之前提到的3种类型,来判断数据包需要读取的长度,然后调用:
memcpy(buffer, [preBuffer readBuffer], bytesToCopy);
把数据从preBuffer
中,移到了currentRead
数据包中。
step2:从socket
中读取数据:
// 从socket中去读取
//是否读到EOFException ,这个错误指的是文件结尾了还在继续读,就会导致这个错误被抛出
BOOL socketEOF = (flags & kSocketHasReadEOF) ? YES : NO; // Nothing more to read via socket (end of file)
//如果没完成,且没错,没读到结尾,且没有可读数据了
BOOL waiting = !done && !error && !socketEOF && !hasBytesAvailable; // Ran out of data, waiting for more
//如果没完成,且没错,没读到结尾,有可读数据
if (!done && !error && !socketEOF && hasBytesAvailable)
{
//断言,有可读数据
NSAssert(([preBuffer availableBytes] == 0), @"Invalid logic");
//是否读到preBuffer中去
BOOL readIntoPreBuffer = NO;
uint8_t *buffer = NULL;
size_t bytesRead = 0;
//如果flag标记为安全socket
if (flags & kSocketSecure)
{
//...类似flushSSLBuffer的一系列操作
}
else
{
// Normal socket operation
//普通的socket 操作
NSUInteger bytesToRead;
// There are 3 types of read packets:
//
// 1) Read all available data.
// 2) Read a specific length of data.
// 3) Read up to a particular terminator.
//和上面类似,读取到边界标记??不是吧
if (currentRead->term != nil)
{
// Read type #3 - read up to a terminator
//读这个长度,如果到maxlength,就用maxlength。看如果可用空间大于需要读的空间,则不用prebuffer
bytesToRead = [currentRead readLengthForTermWithHint:estimatedBytesAvailable
shouldPreBuffer:&readIntoPreBuffer];
}
else
{
// Read type #1 or #2
//直接读这个长度,如果到maxlength,就用maxlength
bytesToRead = [currentRead readLengthForNonTermWithHint:estimatedBytesAvailable];
}
//大于最大值,则先读最大值
if (bytesToRead > SIZE_MAX) { // NSUInteger may be bigger than size_t (read param 3)
bytesToRead = SIZE_MAX;
}
// Make sure we have enough room in the buffer for our read.
//
// We are either reading directly into the currentRead->buffer,
// or we're reading into the temporary preBuffer.
if (readIntoPreBuffer)
{
[preBuffer ensureCapacityForWrite:bytesToRead];
buffer = [preBuffer writeBuffer];
}
else
{
[currentRead ensureCapacityForAdditionalDataOfLength:bytesToRead];
buffer = (uint8_t *)[currentRead->buffer mutableBytes]
+ currentRead->startOffset
+ currentRead->bytesDone;
}
// Read data into buffer
int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;
#pragma mark - 开始读取数据,最普通的形式 read
//读数据
ssize_t result = read(socketFD, buffer, (size_t)bytesToRead);
LogVerbose(@"read from socket = %i", (int)result);
//读取错误
if (result < 0)
{
//EWOULDBLOCK IO阻塞
if (errno == EWOULDBLOCK)
//先等待
waiting = YES;
else
//得到错误
error = [self errnoErrorWithReason:@"Error in read() function"];
//把可读取的长度设置为0
socketFDBytesAvailable = 0;
}
//读到边界了
else if (result == 0)
{
socketEOF = YES;
socketFDBytesAvailable = 0;
}
//正常
else
{
//设置读到的数据长度
bytesRead = result;
//如果读到的数据小于应该读的长度,说明这个包没读完
if (bytesRead < bytesToRead)
{
// The read returned less data than requested.
// This means socketFDBytesAvailable was a bit off due to timing,
// because we read from the socket right when the readSource event was firing.
socketFDBytesAvailable = 0;
}
//正常
else
{
//如果 socketFDBytesAvailable比读了的数据小的话,直接置为0
if (socketFDBytesAvailable <= bytesRead)
socketFDBytesAvailable = 0;
//减去已读大小
else
socketFDBytesAvailable -= bytesRead;
}
//如果 socketFDBytesAvailable 可读数量为0,把读的状态切换为等待
if (socketFDBytesAvailable == 0)
{
waiting = YES;
}
}
}
本来想讲点什么。。发现确实没什么好讲的,无非就是判断应该读取的长度,然后调用:
ssize_t result = read(socketFD, buffer, (size_t)bytesToRead);
从socket
中得到读取的实际长度。
唯一需要讲一下的可能是数据流向的问题,这里调用:
bytesToRead = [currentRead readLengthForTermWithHint:estimatedBytesAvailable shouldPreBuffer:&readIntoPreBuffer];
来判断数据是否先流向prebuffer
,还是直接流向currentRead
,而SSL的读取中也有类似方法:
- (NSUInteger)optimalReadLengthWithDefault:(NSUInteger)defaultValue shouldPreBuffer:(BOOL *)shouldPreBufferPtr
这个方法核心的思路就是,如果当前读取包,长度给明了,则直接流向currentRead
,如果数据长度不清楚,那么则去判断这一次读取的长度,和currentRead
可用空间长度去对比,如果长度比currentRead
可用空间小,则流向currentRead
,否则先用prebuffer
来缓冲。
至于细节方面,大家对着github
中的源码注释看看吧,这么大篇幅的业务代码,一行行讲确实没什么意义。
走完这两步读取,接着就是第三步:
step3:判断数据包完成程度:
这里有3种情况:
1.数据包刚好读完;2.数据粘包;3.数据断包;
注:这里判断粘包断包的长度,都是我们一开始调用read
方法给的长度或者分界符得出的。
很显然,第一种就什么都不用处理,完美匹配。
第二种情况,我们把需要的长度放到currentRead
,多余的长度放到prebuffer
中去。
第三种情况,数据还没读完,我们暂时为未读完。
这里就不贴代码了。
就这样普通读取数据的整个流程就走完了,而SSL
的两种模式,和上述基本一致。
我们接着根据之前读取的结果,来判断数据是否读完:
//检查是否读完
if (done)
{
//完成这次数据的读取
[self completeCurrentRead];
//如果没出错,没有到边界,prebuffer中还有可读数据
if (!error && (!socketEOF || [preBuffer availableBytes] > 0))
{
//让读操作离队,继续进行下一次读取
[self maybeDequeueRead];
}
}
如果读完,则去做读完的操作,并且进行下一次读取。
我们来看看读完的操作:
//完成了这次的读数据
- (void)completeCurrentRead
{
LogTrace();
//断言currentRead
NSAssert(currentRead, @"Trying to complete current read when there is no current read.");
//结果数据
NSData *result = nil;
//如果是我们自己创建的Buffer
if (currentRead->bufferOwner)
{
// We created the buffer on behalf of the user.
// Trim our buffer to be the proper size.
//修剪buffer到合适的大小
//把大小设置到我们读取到的大小
[currentRead->buffer setLength:currentRead->bytesDone];
//赋值给result
result = currentRead->buffer;
}
else
{
// We did NOT create the buffer.
// The buffer is owned by the caller.
// Only trim the buffer if we had to increase its size.
//这是调用者的data,我们只会去加大尺寸
if ([currentRead->buffer length] > currentRead->originalBufferLength)
{
//拿到的读的size
NSUInteger readSize = currentRead->startOffset + currentRead->bytesDone;
//拿到原始尺寸
NSUInteger origSize = currentRead->originalBufferLength;
//取得最大的
NSUInteger buffSize = MAX(readSize, origSize);
//把buffer设置为较大的尺寸
[currentRead->buffer setLength:buffSize];
}
//拿到数据的头指针
uint8_t *buffer = (uint8_t *)[currentRead->buffer mutableBytes] + currentRead->startOffset;
//reslut为,从头指针开始到长度为写的长度 freeWhenDone为YES,创建完就释放buffer
result = [NSData dataWithBytesNoCopy:buffer length:currentRead->bytesDone freeWhenDone:NO];
}
__strong id theDelegate = delegate;
#pragma mark -总算到调用代理方法,接受到数据了
if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:didReadData:withTag:)])
{
//拿到当前的数据包
GCDAsyncReadPacket *theRead = currentRead; // Ensure currentRead retained since result may not own buffer
dispatch_async(delegateQueue, ^{ @autoreleasepool {
//把result在代理queue中回调出去。
[theDelegate socket:self didReadData:result withTag:theRead->tag];
}});
}
//取消掉读取超时
[self endCurrentRead];
}
这里对currentRead
的data
做了个长度的设置。然后调用代理把最终包给回调出去。最后关掉我们之前提到的读取超时。
还是回到doReadData
,就剩下最后一点处理了:
//如果这次读的数量大于0
else if (totalBytesReadForCurrentRead > 0)
{
// We're not done read type #2 or #3 yet, but we have read in some bytes
__strong id theDelegate = delegate;
//如果响应读数据进度的代理
if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:didReadPartialDataOfLength:tag:)])
{
long theReadTag = currentRead->tag;
//代理queue中回调出去
dispatch_async(delegateQueue, ^{ @autoreleasepool {
[theDelegate socket:self didReadPartialDataOfLength:totalBytesReadForCurrentRead tag:theReadTag];
}});
}
}
这里未完成,如果这次读取大于0,如果响应读取进度的代理,则把当前进度回调出去。
最后检查错误:
//检查错误
if (error)
{
//如果有错直接报错断开连接
[self closeWithError:error];
}
//如果是读到边界错误
else if (socketEOF)
{
[self doReadEOF];
}
//如果是等待
else if (waiting)
{
//如果用的是CFStream,则读取数据和source无关
//非CFStream形式
if (![self usingCFStreamForTLS])
{
// Monitor the socket for readability (if we're not already doing so)
//重新恢复source
[self resumeReadSource];
}
}
如果有错,直接断开socket
,如果是边界错误,调用边界错误处理,如果是等待,说明当前包还没读完,如果非CFStream
的TLS
,则恢复source
,等待下一次数据到达的触发。
关于这个读取边界错误EOF
,这里我简单的提下,其实它就是服务端发出一个边界错误,说明不会再有数据发送给我们了。我们讲无法再接收到数据,但是我们其实还是可以写数据,发送给服务端的。
而doReadEOF
这个方法的处理,就是做了这么一件事。判断我们是否需要这种不可读,只能写的连接。
我们来简单看看这个方法:
Part6.读取边界错误处理:
//读到EOFException,边界错误
- (void)doReadEOF
{
LogTrace();
//这个方法可能被调用很多次,如果读到EOF的时候,还有数据在prebuffer中,在调用doReadData之后?? 这个方法可能被持续的调用
//标记为读EOF
flags |= kSocketHasReadEOF;
//如果是安全socket
if (flags & kSocketSecure)
{
//去刷新sslbuffer中的数据
[self flushSSLBuffers];
}
//标记是否应该断开连接
BOOL shouldDisconnect = NO;
NSError *error = nil;
//如果状态为开始读写TLS
if ((flags & kStartingReadTLS) || (flags & kStartingWriteTLS))
{
//我们得到EOF在开启TLS之前,这个TLS握手是不可能的,因此这是不可恢复的错误
//标记断开连接
shouldDisconnect = YES;
//如果是安全的TLS,赋值错误
if ([self usingSecureTransportForTLS])
{
error = [self sslError:errSSLClosedAbort];
}
}
//如果是读流关闭状态
else if (flags & kReadStreamClosed)
{
//不应该被关闭
shouldDisconnect = NO;
}
else if ([preBuffer availableBytes] > 0)
{
//仍然有数据可读的时候不关闭
shouldDisconnect = NO;
}
else if (config & kAllowHalfDuplexConnection)
{
//拿到socket
int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;
//轮询用的结构体
/*
struct pollfd {
int fd; //文件描述符
short events; //要求查询的事件掩码 监听的
short revents; //返回的事件掩码 实际发生的
};
*/
struct pollfd pfd[1];
pfd[0].fd = socketFD;
//写数据不会导致阻塞。
pfd[0].events = POLLOUT;
//这个为当前实际发生的事情
pfd[0].revents = 0;
/*
poll函数使用pollfd类型的结构来监控一组文件句柄,ufds是要监控的文件句柄集合,nfds是监控的文件句柄数量,timeout是等待的毫秒数,这段时间内无论I/O是否准备好,poll都会返回。timeout为负数表示无线等待,timeout为0表示调用后立即返回。执行结果:为0表示超时前没有任何事件发生;-1表示失败;成功则返回结构体中revents不为0的文件描述符个数。pollfd结构监控的事件类型如下:
int poll(struct pollfd *ufds, unsigned int nfds, int timeout);
*/
//阻塞的,但是timeout为0,则不阻塞,直接返回
poll(pfd, 1, 0);
//如果被触发的事件是写数据
if (pfd[0].revents & POLLOUT)
{
// Socket appears to still be writeable
//则标记为不关闭
shouldDisconnect = NO;
//标记为读流关闭
flags |= kReadStreamClosed;
// Notify the delegate that we're going half-duplex
//通知代理,我们开始半双工
__strong id theDelegate = delegate;
//调用已经关闭读流的代理方法
if (delegateQueue && [theDelegate respondsToSelector:@selector(socketDidCloseReadStream:)])
{
dispatch_async(delegateQueue, ^{ @autoreleasepool {
[theDelegate socketDidCloseReadStream:self];
}});
}
}
else
{
//标记为断开
shouldDisconnect = YES;
}
}
else
{
shouldDisconnect = YES;
}
//如果应该断开
if (shouldDisconnect)
{
if (error == nil)
{
//判断是否是安全TLS传输
if ([self usingSecureTransportForTLS])
{
///标记错误信息
if (sslErrCode != noErr && sslErrCode != errSSLClosedGraceful)
{
error = [self sslError:sslErrCode];
}
else
{
error = [self connectionClosedError];
}
}
else
{
error = [self connectionClosedError];
}
}
//关闭socket
[self closeWithError:error];
}
//不断开
else
{
//如果不是用CFStream流
if (![self usingCFStreamForTLS])
{
// Suspend the read source (if needed)
//挂起读source
[self suspendReadSource];
}
}
}
简单说一下,这个方法主要是对socket
是否需要主动关闭进行了判断:这里仅仅以下3种情况,不会关闭socket
:
- 读流已经是关闭状态(如果加了这个标记,说明为半双工连接状态)。
preBuffer
中还有可读数据,我们需要等数据读完才能关闭连接。- 配置标记为
kAllowHalfDuplexConnection
,我们则要开始半双工处理。我们调用了:
poll(pfd, 1, 0);
函数,如果触发了写事件POLLOUT
,说明我们半双工连接成功,则我们可以在读流关闭的状态下,仍然可以向服务器写数据。
其他情况下,一律直接关闭socket
。
而不关闭的情况下,我们会挂起source
。这样我们就只能可写不可读了。
作者:Cooci_和谐学习_不急不躁
链接:https://www.jianshu.com/p/5a2df8a6a54e
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。