Index: src/OFRunLoop.h ================================================================== --- src/OFRunLoop.h +++ src/OFRunLoop.h @@ -53,10 +53,14 @@ + (void)OF_setMainRunLoop; #ifdef OF_HAVE_BLOCKS + (void)OF_addAsyncReadForStream: (OFStream*)stream buffer: (void*)buffer length: (size_t)length + block: (of_stream_async_read_block_t)block; ++ (void)OF_addAsyncReadForStream: (OFStream*)stream + buffer: (void*)buffer + exactLength: (size_t)length block: (of_stream_async_read_block_t)block; + (void)OF_addAsyncReadLineForStream: (OFStream*)stream encoding: (of_string_encoding_t)encoding block: (of_stream_async_read_line_block_t)block; + (void)OF_addAsyncAcceptForTCPSocket: (OFTCPSocket*)socket Index: src/OFRunLoop.m ================================================================== --- src/OFRunLoop.m +++ src/OFRunLoop.m @@ -31,63 +31,69 @@ static OFRunLoop *mainRunLoop = nil; #ifdef OF_HAVE_BLOCKS @interface OFRunLoop_ReadQueueItem: OFObject { +@public void *buffer; size_t length; of_stream_async_read_block_t block; } +@end -@property void *buffer; -@property size_t length; -@property (copy) of_stream_async_read_block_t block; +@interface OFRunLoop_ExactReadQueueItem: OFObject +{ +@public + void *buffer; + size_t exactLength, readLength; + of_stream_async_read_block_t block; +} @end @interface OFRunLoop_ReadLineQueueItem: OFObject { +@public of_stream_async_read_line_block_t block; of_string_encoding_t encoding; } - -@property (copy) of_stream_async_read_line_block_t block; -@property of_string_encoding_t encoding; @end @interface OFRunLoop_AcceptQueueItem: OFObject { +@public of_tcpsocket_async_accept_block_t block; } - -@property (copy) of_tcpsocket_async_accept_block_t block; @end @implementation OFRunLoop_ReadQueueItem -@synthesize buffer, length, block; +- (void)dealloc +{ + [block release]; + + [super dealloc]; +} +@end +@implementation OFRunLoop_ExactReadQueueItem - (void)dealloc { [block release]; [super dealloc]; } @end @implementation OFRunLoop_ReadLineQueueItem -@synthesize block, encoding; - - (void)dealloc { [block release]; [super dealloc]; } @end @implementation OFRunLoop_AcceptQueueItem -@synthesize block; - - (void)dealloc { [block release]; [super dealloc]; @@ -113,13 +119,13 @@ objc_autoreleasePoolPop(pool); } #ifdef OF_HAVE_BLOCKS + (void)OF_addAsyncReadForStream: (OFStream*)stream - buffer: (void*)buffer - length: (size_t)length - block: (of_stream_async_read_block_t)block + buffer: (void*)buffer + length: (size_t)length + block: (of_stream_async_read_block_t)block { void *pool = objc_autoreleasePoolPush(); OFRunLoop *runLoop = [self currentRunLoop]; OFList *queue = [runLoop->readQueues objectForKey: stream]; OFRunLoop_ReadQueueItem *queueItem; @@ -132,13 +138,41 @@ if ([queue count] == 0) [runLoop->streamObserver addStreamForReading: stream]; queueItem = [[[OFRunLoop_ReadQueueItem alloc] init] autorelease]; - [queueItem setBuffer: buffer]; - [queueItem setLength: length]; - [queueItem setBlock: block]; + queueItem->buffer = buffer; + queueItem->length = length; + queueItem->block = [block copy]; + [queue appendObject: queueItem]; + + objc_autoreleasePoolPop(pool); +} + ++ (void)OF_addAsyncReadForStream: (OFStream*)stream + buffer: (void*)buffer + exactLength: (size_t)exactLength + block: (of_stream_async_read_block_t)block +{ + void *pool = objc_autoreleasePoolPush(); + OFRunLoop *runLoop = [self currentRunLoop]; + OFList *queue = [runLoop->readQueues objectForKey: stream]; + OFRunLoop_ExactReadQueueItem *queueItem; + + if (queue == nil) { + queue = [OFList list]; + [runLoop->readQueues setObject: queue + forKey: stream]; + } + + if ([queue count] == 0) + [runLoop->streamObserver addStreamForReading: stream]; + + queueItem = [[[OFRunLoop_ExactReadQueueItem alloc] init] autorelease]; + queueItem->buffer = buffer; + queueItem->exactLength = exactLength; + queueItem->block = [block copy]; [queue appendObject: queueItem]; objc_autoreleasePoolPop(pool); } @@ -159,12 +193,12 @@ if ([queue count] == 0) [runLoop->streamObserver addStreamForReading: stream]; queueItem = [[[OFRunLoop_ReadLineQueueItem alloc] init] autorelease]; - [queueItem setBlock: block]; - [queueItem setEncoding: encoding]; + queueItem->block = [block copy]; + queueItem->encoding = encoding; [queue appendObject: queueItem]; objc_autoreleasePoolPop(pool); } @@ -184,11 +218,11 @@ if ([queue count] == 0) [runLoop->streamObserver addStreamForReading: socket]; queueItem = [[[OFRunLoop_AcceptQueueItem alloc] init] autorelease]; - [queueItem setBlock: block]; + queueItem->block = [block copy]; [queue appendObject: queueItem]; objc_autoreleasePoolPop(pool); } #endif @@ -240,31 +274,55 @@ listObject = [queue firstListObject]; if ([listObject->object isKindOfClass: [OFRunLoop_ReadQueueItem class]]) { OFRunLoop_ReadQueueItem *queueItem = listObject->object; - void *buffer = [queueItem buffer]; - size_t length = [stream readIntoBuffer: buffer - length: [queueItem length]]; + size_t length = [stream readIntoBuffer: queueItem->buffer + length: queueItem->length]; - if (![queueItem block](stream, buffer, length)) { + if (!queueItem->block(stream, queueItem->buffer, length)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [streamObserver removeStreamForReading: stream]; [readQueues removeObjectForKey: stream]; } + } + } else if ([listObject->object isKindOfClass: + [OFRunLoop_ExactReadQueueItem class]]) { + OFRunLoop_ExactReadQueueItem *queueItem = listObject->object; + size_t length = [stream + readIntoBuffer: (char*)queueItem->buffer + + queueItem->readLength + length: queueItem->exactLength - + queueItem->readLength]; + + queueItem->readLength += length; + if (queueItem->readLength == queueItem->exactLength || + [stream isAtEndOfStream]) { + if (queueItem->block(stream, queueItem->buffer, + queueItem->readLength)) + queueItem->readLength = 0; + else { + [queue removeListObject: listObject]; + + if ([queue count] == 0) { + [streamObserver + removeStreamForReading: stream]; + [readQueues removeObjectForKey: stream]; + } + } } } else if ([listObject->object isKindOfClass: [OFRunLoop_ReadLineQueueItem class]]) { OFRunLoop_ReadLineQueueItem *queueItem = listObject->object; OFString *line; - line = [stream tryReadLineWithEncoding: [queueItem encoding]]; + line = [stream tryReadLineWithEncoding: queueItem->encoding]; if (line != nil || [stream isAtEndOfStream]) { - if (![queueItem block](stream, line)) { + if (!queueItem->block(stream, line)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [streamObserver removeStreamForReading: stream]; @@ -275,11 +333,11 @@ } else if ([listObject->object isKindOfClass: [OFRunLoop_AcceptQueueItem class]]) { OFRunLoop_AcceptQueueItem *queueItem = listObject->object; OFTCPSocket *newSocket = [(OFTCPSocket*)stream accept]; - if (![queueItem block]((OFTCPSocket*)stream, newSocket)) { + if (!queueItem->block((OFTCPSocket*)stream, newSocket)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [streamObserver removeStreamForReading: stream]; [readQueues removeObjectForKey: stream]; Index: src/OFStream.h ================================================================== --- src/OFStream.h +++ src/OFStream.h @@ -76,44 +76,22 @@ /** * \brief Reads at most size bytes from the stream into a buffer. * * On network streams, this might read less than the specified number of bytes. * If you want to read exactly the specified number of bytes, use - * -[readIntoBuffer:exactLength:]. + * -readIntoBuffer:exactLength:. Note that a read can even return 0 bytes - + * this does not necessarily mean that the stream ended, so you still need to + * check isAtEndOfStream. * * \param buffer The buffer into which the data is read * \param length The length of the data that should be read at most. * The buffer must be at least this big! * \return The number of bytes read */ - (size_t)readIntoBuffer: (void*)buffer length: (size_t)size; -#ifdef OF_HAVE_BLOCKS -/** - * \brief Asyncronously reads at most size bytes from the stream into a - * buffer. - * - * On network streams, this might read less than the specified number of bytes. - * If you want to read exactly the specified number of bytes, use - * -[readIntoBuffer:exactLength:]. - * - * \param buffer The buffer into which the data is read. - * The buffer must not be free'd before the async read completed! - * \param length The length of the data that should be read at most. - * The buffer must be at least this big! - * \param block The block to call when the data has been received. - * If the block returns YES, it will be called again with the same - * buffer and maximum length when more data has been received. If - * you want the next block in the queue to handle the data - * received next, you need to return NO from the block. - */ -- (void)asyncReadWithBuffer: (void*)buffer - length: (size_t)length - block: (of_stream_async_read_block_t)block; -#endif - /** * \brief Reads exactly the specified length bytes from the stream into a * buffer. * * Unlike readIntoBuffer:length:, this method does not return when less than the @@ -123,14 +101,61 @@ * \warning Only call this when you know that specified amount of data is * available! Otherwise you will get an exception! * * \param buffer The buffer into which the data is read * \param length The length of the data that should be read. - * The buffer must be exactly this big! + * The buffer must be exactly this big! */ - (void)readIntoBuffer: (void*)buffer exactLength: (size_t)length; + +#ifdef OF_HAVE_BLOCKS +/** + * \brief Asyncronously reads at most size bytes from the stream into a + * buffer. + * + * On network streams, this might read less than the specified number of bytes. + * If you want to read exactly the specified number of bytes, use + * asyncReadIntoBuffer:exactLength:block:. Note that a read can even return 0 + * bytes - this does not necessarily mean that the stream ended, so you still + * need to check isAtEndOfStream. + * + * \param buffer The buffer into which the data is read. + * The buffer must not be free'd before the async read completed! + * \param length The length of the data that should be read at most. + * The buffer must be at least this big! + * \param block The block to call when the data has been received. + * If the block returns YES, it will be called again with the same + * buffer and maximum length when more data has been received. If + * you want the next block in the queue to handle the data + * received next, you need to return NO from the block. + */ +- (void)asyncReadIntoBuffer: (void*)buffer + length: (size_t)length + block: (of_stream_async_read_block_t)block; + +/** + * \brief Asyncronously reads exactly the specified length bytes from the + * stream into a buffer. + * + * Unlike asyncReadIntoBuffer:length:block, this method does not invoke the + * block when less than the specified length has been read - instead, it waits + * until it got exactly the specified length or the stream has ended. + * + * \param buffer The buffer into which the data is read + * \param length The length of the data that should be read. + * The buffer must be exactly this big! + * \param block The block to call when the data has been received. + * If the block returns YES, it will be called again with the same + * buffer and exact length when more data has been received. If + * you want the next block in the queue to handle the data + * received next, you need to return NO from the block. + */ + - (void)asyncReadIntoBuffer: (void*)buffer + exactLength: (size_t)length + block: (of_stream_async_read_block_t)block; +#endif /** * \brief Reads a uint8_t from the stream. * * \warning Only call this when you know that enough data is available! Index: src/OFStream.m ================================================================== --- src/OFStream.m +++ src/OFStream.m @@ -132,29 +132,41 @@ return length; } } -- (void)asyncReadWithBuffer: (void*)buffer +- (void)readIntoBuffer: (void*)buffer + exactLength: (size_t)length +{ + size_t readLength = 0; + + while (readLength < length) + readLength += [self readIntoBuffer: (char*)buffer + readLength + length: length - readLength]; +} + +#ifdef OF_HAVE_BLOCKS +- (void)asyncReadIntoBuffer: (void*)buffer length: (size_t)length block: (of_stream_async_read_block_t)block { [OFRunLoop OF_addAsyncReadForStream: self buffer: buffer length: length block: block]; } -- (void)readIntoBuffer: (void*)buffer - exactLength: (size_t)length -{ - size_t readLength = 0; - - while (readLength < length) - readLength += [self readIntoBuffer: (char*)buffer + readLength - length: length - readLength]; -} +- (void)asyncReadIntoBuffer: (void*)buffer + exactLength: (size_t)length + block: (of_stream_async_read_block_t)block +{ + [OFRunLoop OF_addAsyncReadForStream: self + buffer: buffer + exactLength: length + block: block]; +} +#endif - (uint8_t)readInt8 { uint8_t ret; Index: src/OFStreamObserver.m ================================================================== --- src/OFStreamObserver.m +++ src/OFStreamObserver.m @@ -382,11 +382,10 @@ size_t i, count = [readStreams count]; BOOL foundInCache = NO; for (i = 0; i < count; i++) { - if ([objects[i] pendingBytes] > 0 && ![objects[i] OF_isWaitingForDelimiter]) { void *pool = objc_autoreleasePoolPush(); [delegate streamIsReadyForReading: objects[i]]; foundInCache = YES;