@@ -31,63 +31,69 @@ static OFRunLoop *mainRunLoop = nil; #ifdef OF_HAVE_BLOCKS @interface OFRunLoop_ReadQueueItem: OFObject { +@public void *buffer; size_t length; of_stream_async_read_block_t block; } +@end -@property void *buffer; -@property size_t length; -@property (copy) of_stream_async_read_block_t block; +@interface OFRunLoop_ExactReadQueueItem: OFObject +{ +@public + void *buffer; + size_t exactLength, readLength; + of_stream_async_read_block_t block; +} @end @interface OFRunLoop_ReadLineQueueItem: OFObject { +@public of_stream_async_read_line_block_t block; of_string_encoding_t encoding; } - -@property (copy) of_stream_async_read_line_block_t block; -@property of_string_encoding_t encoding; @end @interface OFRunLoop_AcceptQueueItem: OFObject { +@public of_tcpsocket_async_accept_block_t block; } - -@property (copy) of_tcpsocket_async_accept_block_t block; @end @implementation OFRunLoop_ReadQueueItem -@synthesize buffer, length, block; +- (void)dealloc +{ + [block release]; + + [super dealloc]; +} +@end +@implementation OFRunLoop_ExactReadQueueItem - (void)dealloc { [block release]; [super dealloc]; } @end @implementation OFRunLoop_ReadLineQueueItem -@synthesize block, encoding; - - (void)dealloc { [block release]; [super dealloc]; } @end @implementation OFRunLoop_AcceptQueueItem -@synthesize block; - - (void)dealloc { [block release]; [super dealloc]; @@ -113,13 +119,13 @@ objc_autoreleasePoolPop(pool); } #ifdef OF_HAVE_BLOCKS + (void)OF_addAsyncReadForStream: (OFStream*)stream - buffer: (void*)buffer - length: (size_t)length - block: (of_stream_async_read_block_t)block + buffer: (void*)buffer + length: (size_t)length + block: (of_stream_async_read_block_t)block { void *pool = objc_autoreleasePoolPush(); OFRunLoop *runLoop = [self currentRunLoop]; OFList *queue = [runLoop->readQueues objectForKey: stream]; OFRunLoop_ReadQueueItem *queueItem; @@ -132,13 +138,41 @@ if ([queue count] == 0) [runLoop->streamObserver addStreamForReading: stream]; queueItem = [[[OFRunLoop_ReadQueueItem alloc] init] autorelease]; - [queueItem setBuffer: buffer]; - [queueItem setLength: length]; - [queueItem setBlock: block]; + queueItem->buffer = buffer; + queueItem->length = length; + queueItem->block = [block copy]; + [queue appendObject: queueItem]; + + objc_autoreleasePoolPop(pool); +} + ++ (void)OF_addAsyncReadForStream: (OFStream*)stream + buffer: (void*)buffer + exactLength: (size_t)exactLength + block: (of_stream_async_read_block_t)block +{ + void *pool = objc_autoreleasePoolPush(); + OFRunLoop *runLoop = [self currentRunLoop]; + OFList *queue = [runLoop->readQueues objectForKey: stream]; + OFRunLoop_ExactReadQueueItem *queueItem; + + if (queue == nil) { + queue = [OFList list]; + [runLoop->readQueues setObject: queue + forKey: stream]; + } + + if ([queue count] == 0) + [runLoop->streamObserver addStreamForReading: stream]; + + queueItem = [[[OFRunLoop_ExactReadQueueItem alloc] init] autorelease]; + queueItem->buffer = buffer; + queueItem->exactLength = exactLength; + queueItem->block = [block copy]; [queue appendObject: queueItem]; objc_autoreleasePoolPop(pool); } @@ -159,12 +193,12 @@ if ([queue count] == 0) [runLoop->streamObserver addStreamForReading: stream]; queueItem = [[[OFRunLoop_ReadLineQueueItem alloc] init] autorelease]; - [queueItem setBlock: block]; - [queueItem setEncoding: encoding]; + queueItem->block = [block copy]; + queueItem->encoding = encoding; [queue appendObject: queueItem]; objc_autoreleasePoolPop(pool); } @@ -184,11 +218,11 @@ if ([queue count] == 0) [runLoop->streamObserver addStreamForReading: socket]; queueItem = [[[OFRunLoop_AcceptQueueItem alloc] init] autorelease]; - [queueItem setBlock: block]; + queueItem->block = [block copy]; [queue appendObject: queueItem]; objc_autoreleasePoolPop(pool); } #endif @@ -240,31 +274,55 @@ listObject = [queue firstListObject]; if ([listObject->object isKindOfClass: [OFRunLoop_ReadQueueItem class]]) { OFRunLoop_ReadQueueItem *queueItem = listObject->object; - void *buffer = [queueItem buffer]; - size_t length = [stream readIntoBuffer: buffer - length: [queueItem length]]; + size_t length = [stream readIntoBuffer: queueItem->buffer + length: queueItem->length]; - if (![queueItem block](stream, buffer, length)) { + if (!queueItem->block(stream, queueItem->buffer, length)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [streamObserver removeStreamForReading: stream]; [readQueues removeObjectForKey: stream]; } + } + } else if ([listObject->object isKindOfClass: + [OFRunLoop_ExactReadQueueItem class]]) { + OFRunLoop_ExactReadQueueItem *queueItem = listObject->object; + size_t length = [stream + readIntoBuffer: (char*)queueItem->buffer + + queueItem->readLength + length: queueItem->exactLength - + queueItem->readLength]; + + queueItem->readLength += length; + if (queueItem->readLength == queueItem->exactLength || + [stream isAtEndOfStream]) { + if (queueItem->block(stream, queueItem->buffer, + queueItem->readLength)) + queueItem->readLength = 0; + else { + [queue removeListObject: listObject]; + + if ([queue count] == 0) { + [streamObserver + removeStreamForReading: stream]; + [readQueues removeObjectForKey: stream]; + } + } } } else if ([listObject->object isKindOfClass: [OFRunLoop_ReadLineQueueItem class]]) { OFRunLoop_ReadLineQueueItem *queueItem = listObject->object; OFString *line; - line = [stream tryReadLineWithEncoding: [queueItem encoding]]; + line = [stream tryReadLineWithEncoding: queueItem->encoding]; if (line != nil || [stream isAtEndOfStream]) { - if (![queueItem block](stream, line)) { + if (!queueItem->block(stream, line)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [streamObserver removeStreamForReading: stream]; @@ -275,11 +333,11 @@ } else if ([listObject->object isKindOfClass: [OFRunLoop_AcceptQueueItem class]]) { OFRunLoop_AcceptQueueItem *queueItem = listObject->object; OFTCPSocket *newSocket = [(OFTCPSocket*)stream accept]; - if (![queueItem block]((OFTCPSocket*)stream, newSocket)) { + if (!queueItem->block((OFTCPSocket*)stream, newSocket)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [streamObserver removeStreamForReading: stream]; [readQueues removeObjectForKey: stream];