Index: src/OFRunLoop.h ================================================================== --- src/OFRunLoop.h +++ src/OFRunLoop.h @@ -49,10 +49,27 @@ * \return The run loop for the current thread */ + (OFRunLoop*)currentRunLoop; + (void)OF_setMainRunLoop; ++ (void)OF_addAsyncReadForStream: (OFStream*)stream + buffer: (void*)buffer + length: (size_t)length + target: (id)target + selector: (SEL)selector; ++ (void)OF_addAsyncReadForStream: (OFStream*)stream + buffer: (void*)buffer + exactLength: (size_t)length + target: (id)target + selector: (SEL)selector; ++ (void)OF_addAsyncReadLineForStream: (OFStream*)stream + encoding: (of_string_encoding_t)encoding + target: (id)target + selector: (SEL)selector; ++ (void)OF_addAsyncAcceptForTCPSocket: (OFTCPSocket*)socket + target: (id)target + selector: (SEL)selector; #ifdef OF_HAVE_BLOCKS + (void)OF_addAsyncReadForStream: (OFStream*)stream buffer: (void*)buffer length: (size_t)length block: (of_stream_async_read_block_t)block; Index: src/OFRunLoop.m ================================================================== --- src/OFRunLoop.m +++ src/OFRunLoop.m @@ -28,80 +28,106 @@ #import "autorelease.h" #import "macros.h" static OFRunLoop *mainRunLoop = nil; -#ifdef OF_HAVE_BLOCKS @interface OFRunLoop_ReadQueueItem: OFObject { @public void *buffer; size_t length; + id target; + SEL selector; +#ifdef OF_HAVE_BLOCKS of_stream_async_read_block_t block; +#endif } @end @interface OFRunLoop_ExactReadQueueItem: OFObject { @public void *buffer; size_t exactLength, readLength; + id target; + SEL selector; +#ifdef OF_HAVE_BLOCKS of_stream_async_read_block_t block; +#endif } @end @interface OFRunLoop_ReadLineQueueItem: OFObject { @public - of_stream_async_read_line_block_t block; of_string_encoding_t encoding; + id target; + SEL selector; +#ifdef OF_HAVE_BLOCKS + of_stream_async_read_line_block_t block; +#endif } @end @interface OFRunLoop_AcceptQueueItem: OFObject { @public + id target; + SEL selector; +#ifdef OF_HAVE_BLOCKS of_tcpsocket_async_accept_block_t block; +#endif } @end @implementation OFRunLoop_ReadQueueItem - (void)dealloc { + [target release]; +#ifdef OF_HAVE_BLOCKS [block release]; +#endif [super dealloc]; } @end @implementation OFRunLoop_ExactReadQueueItem - (void)dealloc { + [target release]; +#ifdef OF_HAVE_BLOCKS [block release]; +#endif [super dealloc]; } @end @implementation OFRunLoop_ReadLineQueueItem - (void)dealloc { + [target release]; +#ifdef OF_HAVE_BLOCKS [block release]; +#endif [super dealloc]; } @end @implementation OFRunLoop_AcceptQueueItem - (void)dealloc { + [target release]; +#ifdef OF_HAVE_BLOCKS [block release]; +#endif [super dealloc]; } @end -#endif @implementation OFRunLoop + (OFRunLoop*)mainRunLoop { return [[mainRunLoop retain] autorelease]; @@ -116,118 +142,127 @@ { void *pool = objc_autoreleasePoolPush(); mainRunLoop = [[self currentRunLoop] retain]; objc_autoreleasePoolPop(pool); } + +#define ADD(type, code) \ + void *pool = objc_autoreleasePoolPush(); \ + OFRunLoop *runLoop = [self currentRunLoop]; \ + OFList *queue = [runLoop->readQueues objectForKey: stream]; \ + type *queueItem; \ + \ + if (queue == nil) { \ + queue = [OFList list]; \ + [runLoop->readQueues setObject: queue \ + forKey: stream]; \ + } \ + \ + if ([queue count] == 0) \ + [runLoop->streamObserver addStreamForReading: stream]; \ + \ + queueItem = [[[type alloc] init] autorelease]; \ + code \ + [queue appendObject: queueItem]; \ + \ + objc_autoreleasePoolPop(pool); + ++ (void)OF_addAsyncReadForStream: (OFStream*)stream + buffer: (void*)buffer + length: (size_t)length + target: (id)target + selector: (SEL)selector +{ + ADD(OFRunLoop_ReadQueueItem, { + queueItem->buffer = buffer; + queueItem->length = length; + queueItem->target = [target retain]; + queueItem->selector = selector; + }) +} + ++ (void)OF_addAsyncReadForStream: (OFStream*)stream + buffer: (void*)buffer + exactLength: (size_t)exactLength + target: (id)target + selector: (SEL)selector +{ + ADD(OFRunLoop_ExactReadQueueItem, { + queueItem->buffer = buffer; + queueItem->exactLength = exactLength; + queueItem->target = [target retain]; + queueItem->selector = selector; + }) +} + ++ (void)OF_addAsyncReadLineForStream: (OFStream*)stream + encoding: (of_string_encoding_t)encoding + target: (id)target + selector: (SEL)selector +{ + ADD(OFRunLoop_ReadLineQueueItem, { + queueItem->encoding = encoding; + queueItem->target = [target retain]; + queueItem->selector = selector; + }) +} + ++ (void)OF_addAsyncAcceptForTCPSocket: (OFTCPSocket*)stream + target: (id)target + selector: (SEL)selector +{ + ADD(OFRunLoop_AcceptQueueItem, { + queueItem->target = [target retain]; + queueItem->selector = selector; + }) +} #ifdef OF_HAVE_BLOCKS + (void)OF_addAsyncReadForStream: (OFStream*)stream buffer: (void*)buffer length: (size_t)length block: (of_stream_async_read_block_t)block { - void *pool = objc_autoreleasePoolPush(); - OFRunLoop *runLoop = [self currentRunLoop]; - OFList *queue = [runLoop->readQueues objectForKey: stream]; - OFRunLoop_ReadQueueItem *queueItem; - - if (queue == nil) { - queue = [OFList list]; - [runLoop->readQueues setObject: queue - forKey: stream]; - } - - if ([queue count] == 0) - [runLoop->streamObserver addStreamForReading: stream]; - - queueItem = [[[OFRunLoop_ReadQueueItem alloc] init] autorelease]; - queueItem->buffer = buffer; - queueItem->length = length; - queueItem->block = [block copy]; - [queue appendObject: queueItem]; - - objc_autoreleasePoolPop(pool); + ADD(OFRunLoop_ReadQueueItem, { + queueItem->buffer = buffer; + queueItem->length = length; + queueItem->block = [block copy]; + }) } + (void)OF_addAsyncReadForStream: (OFStream*)stream buffer: (void*)buffer exactLength: (size_t)exactLength block: (of_stream_async_read_block_t)block { - void *pool = objc_autoreleasePoolPush(); - OFRunLoop *runLoop = [self currentRunLoop]; - OFList *queue = [runLoop->readQueues objectForKey: stream]; - OFRunLoop_ExactReadQueueItem *queueItem; - - if (queue == nil) { - queue = [OFList list]; - [runLoop->readQueues setObject: queue - forKey: stream]; - } - - if ([queue count] == 0) - [runLoop->streamObserver addStreamForReading: stream]; - - queueItem = [[[OFRunLoop_ExactReadQueueItem alloc] init] autorelease]; - queueItem->buffer = buffer; - queueItem->exactLength = exactLength; - queueItem->block = [block copy]; - [queue appendObject: queueItem]; - - objc_autoreleasePoolPop(pool); + ADD(OFRunLoop_ExactReadQueueItem, { + queueItem->buffer = buffer; + queueItem->exactLength = exactLength; + queueItem->block = [block copy]; + }) } + (void)OF_addAsyncReadLineForStream: (OFStream*)stream encoding: (of_string_encoding_t)encoding block: (of_stream_async_read_line_block_t)block { - void *pool = objc_autoreleasePoolPush(); - OFRunLoop *runLoop = [self currentRunLoop]; - OFList *queue = [runLoop->readQueues objectForKey: stream]; - OFRunLoop_ReadLineQueueItem *queueItem; - - if (queue == nil) { - queue = [OFList list]; - [runLoop->readQueues setObject: queue - forKey: stream]; - } - - if ([queue count] == 0) - [runLoop->streamObserver addStreamForReading: stream]; - - queueItem = [[[OFRunLoop_ReadLineQueueItem alloc] init] autorelease]; - queueItem->block = [block copy]; - queueItem->encoding = encoding; - [queue appendObject: queueItem]; - - objc_autoreleasePoolPop(pool); -} - -+ (void)OF_addAsyncAcceptForTCPSocket: (OFTCPSocket*)socket + ADD(OFRunLoop_ReadLineQueueItem, { + queueItem->encoding = encoding; + queueItem->block = [block copy]; + }) +} + ++ (void)OF_addAsyncAcceptForTCPSocket: (OFTCPSocket*)stream block: (of_tcpsocket_async_accept_block_t)block { - void *pool = objc_autoreleasePoolPush(); - OFRunLoop *runLoop = [self currentRunLoop]; - OFList *queue = [runLoop->readQueues objectForKey: socket]; - OFRunLoop_AcceptQueueItem *queueItem; - - if (queue == nil) { - queue = [OFList list]; - [runLoop->readQueues setObject: queue - forKey: socket]; - } - - if ([queue count] == 0) - [runLoop->streamObserver addStreamForReading: socket]; - - queueItem = [[[OFRunLoop_AcceptQueueItem alloc] init] autorelease]; - queueItem->block = [block copy]; - [queue appendObject: queueItem]; - - objc_autoreleasePoolPop(pool); + ADD(OFRunLoop_AcceptQueueItem, { + queueItem->block = [block copy]; + }) } #endif + +#undef ADD - init { self = [super init]; @@ -261,11 +296,10 @@ [timersQueue addObject: timer]; } [streamObserver cancel]; } -#ifdef OF_HAVE_BLOCKS - (void)streamIsReadyForReading: (OFStream*)stream { OFList *queue = [readQueues objectForKey: stream]; of_list_object_t *listObject; @@ -277,18 +311,42 @@ [OFRunLoop_ReadQueueItem class]]) { OFRunLoop_ReadQueueItem *queueItem = listObject->object; size_t length = [stream readIntoBuffer: queueItem->buffer length: queueItem->length]; - if (!queueItem->block(stream, queueItem->buffer, length)) { - [queue removeListObject: listObject]; - - if ([queue count] == 0) { - [streamObserver removeStreamForReading: stream]; - [readQueues removeObjectForKey: stream]; - } - } +#ifdef OF_HAVE_BLOCKS + if (queueItem->block != NULL) { + if (!queueItem->block(stream, queueItem->buffer, + length)) { + [queue removeListObject: listObject]; + + if ([queue count] == 0) { + [streamObserver + removeStreamForReading: stream]; + [readQueues removeObjectForKey: stream]; + } + } + } else { +#endif + BOOL (*func)(id, SEL, OFStream*, void*, size_t) = + (BOOL(*)(id, SEL, OFStream*, void*, size_t)) + [queueItem->target methodForSelector: + queueItem->selector]; + + if (!func(queueItem->target, queueItem->selector, + stream, queueItem->buffer, length)) { + [queue removeListObject: listObject]; + + if ([queue count] == 0) { + [streamObserver + removeStreamForReading: stream]; + [readQueues removeObjectForKey: stream]; + } + } +#ifdef OF_HAVE_BLOCKS + } +#endif } else if ([listObject->object isKindOfClass: [OFRunLoop_ExactReadQueueItem class]]) { OFRunLoop_ExactReadQueueItem *queueItem = listObject->object; size_t length = [stream readIntoBuffer: (char*)queueItem->buffer + @@ -297,58 +355,136 @@ queueItem->readLength]; queueItem->readLength += length; if (queueItem->readLength == queueItem->exactLength || [stream isAtEndOfStream]) { - if (queueItem->block(stream, queueItem->buffer, - queueItem->readLength)) - queueItem->readLength = 0; - else { - [queue removeListObject: listObject]; - - if ([queue count] == 0) { - [streamObserver - removeStreamForReading: stream]; - [readQueues removeObjectForKey: stream]; - } - } +#ifdef OF_HAVE_BLOCKS + if (queueItem->block != NULL) { + if (queueItem->block(stream, queueItem->buffer, + queueItem->readLength)) + queueItem->readLength = 0; + else { + [queue removeListObject: listObject]; + + if ([queue count] == 0) { + [streamObserver + removeStreamForReading: + stream]; + [readQueues + removeObjectForKey: stream]; + } + } + } else { +#endif + BOOL (*func)(id, SEL, OFStream*, void*, + size_t) = (BOOL(*)(id, SEL, OFStream*, + void*, size_t))[queueItem->target + methodForSelector: queueItem->selector]; + + if (func(queueItem->target, + queueItem->selector, stream, + queueItem->buffer, queueItem->readLength)) + queueItem->readLength = 0; + else { + [queue removeListObject: listObject]; + + if ([queue count] == 0) { + [streamObserver + removeStreamForReading: + stream]; + [readQueues + removeObjectForKey: stream]; + } + } +#ifdef OF_HAVE_BLOCKS + } +#endif } } else if ([listObject->object isKindOfClass: [OFRunLoop_ReadLineQueueItem class]]) { OFRunLoop_ReadLineQueueItem *queueItem = listObject->object; OFString *line; line = [stream tryReadLineWithEncoding: queueItem->encoding]; if (line != nil || [stream isAtEndOfStream]) { - if (!queueItem->block(stream, line)) { +#ifdef OF_HAVE_BLOCKS + if (queueItem->block != NULL) { + if (!queueItem->block(stream, line)) { + [queue removeListObject: listObject]; + + if ([queue count] == 0) { + [streamObserver + removeStreamForReading: + stream]; + [readQueues + removeObjectForKey: stream]; + } + } + } else { +#endif + BOOL (*func)(id, SEL, OFStream*, OFString*) = + (BOOL(*)(id, SEL, OFStream*, OFString*)) + [queueItem->target methodForSelector: + queueItem->selector]; + + if (!func(queueItem->target, + queueItem->selector, stream, line)) { + [queue removeListObject: listObject]; + + if ([queue count] == 0) { + [streamObserver + removeStreamForReading: + stream]; + [readQueues + removeObjectForKey: stream]; + } + } +#ifdef OF_HAVE_BLOCKS + } +#endif + } + } else if ([listObject->object isKindOfClass: + [OFRunLoop_AcceptQueueItem class]]) { + OFRunLoop_AcceptQueueItem *queueItem = listObject->object; + OFTCPSocket *newSocket = [(OFTCPSocket*)stream accept]; + +#ifdef OF_HAVE_BLOCKS + if (queueItem->block != NULL) { + if (!queueItem->block((OFTCPSocket*)stream, + newSocket)) { + [queue removeListObject: listObject]; + + if ([queue count] == 0) { + [streamObserver + removeStreamForReading: stream]; + [readQueues removeObjectForKey: stream]; + } + } + } else { +#endif + BOOL (*func)(id, SEL, OFTCPSocket*, OFTCPSocket*) = + (BOOL(*)(id, SEL, OFTCPSocket*, OFTCPSocket*)) + [queueItem->target methodForSelector: + queueItem->selector]; + + if (!func(queueItem->target, queueItem->selector, + (OFTCPSocket*)stream, newSocket)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [streamObserver removeStreamForReading: stream]; [readQueues removeObjectForKey: stream]; } } - } - } else if ([listObject->object isKindOfClass: - [OFRunLoop_AcceptQueueItem class]]) { - OFRunLoop_AcceptQueueItem *queueItem = listObject->object; - OFTCPSocket *newSocket = [(OFTCPSocket*)stream accept]; - - if (!queueItem->block((OFTCPSocket*)stream, newSocket)) { - [queue removeListObject: listObject]; - - if ([queue count] == 0) { - [streamObserver removeStreamForReading: stream]; - [readQueues removeObjectForKey: stream]; - } - } +#ifdef OF_HAVE_BLOCKS + } +#endif } else OF_ENSURE(0); } -#endif - (void)run { for (;;) { void *pool = objc_autoreleasePoolPush(); Index: src/OFStream.h ================================================================== --- src/OFStream.h +++ src/OFStream.h @@ -106,10 +106,63 @@ * The buffer must be exactly this big! */ - (void)readIntoBuffer: (void*)buffer exactLength: (size_t)length; +/** + * \brief Asyncronously reads at most size bytes from the stream into a + * buffer. + * + * On network streams, this might read less than the specified number of bytes. + * If you want to read exactly the specified number of bytes, use + * asyncReadIntoBuffer:exactLength:block:. Note that a read can even return 0 + * bytes - this does not necessarily mean that the stream ended, so you still + * need to check isAtEndOfStream. + * + * \param buffer The buffer into which the data is read. + * The buffer must not be free'd before the async read completed! + * \param length The length of the data that should be read at most. + * The buffer must be at least this big! + * \param target The target on which the selector should be called when the + * data has been received. If the method returns YES, it will be + * called again with the same buffer and maximum length when more + * data has been received. If you want the next method in the + * queue to handle the data received next, you need to return NO + * from the method. + * \param selector The selector to call on the target. The signature must be + * BOOL (OFStream *stream, void *buffer, size_t size). + */ +- (void)asyncReadIntoBuffer: (void*)buffer + length: (size_t)length + target: (id)target + selector: (SEL)selector; + +/** + * \brief Asyncronously reads exactly the specified length bytes from the + * stream into a buffer. + * + * Unlike asyncReadIntoBuffer:length:block, this method does not call the + * method when less than the specified length has been read - instead, it waits + * until it got exactly the specified length or the stream has ended. + * + * \param buffer The buffer into which the data is read + * \param length The length of the data that should be read. + * The buffer must be exactly this big! + * \param target The target on which the selector should be called when the + * data has been received. If the method returns YES, it will be + * called again with the same buffer and exact length when more + * data has been received. If you want the next method in the + * queue to handle the data received next, you need to return NO + * from the method. + * \param selector The selector to call on the target. The signature must be + * BOOL (OFStream *stream, void *buffer, size_t size). + */ + - (void)asyncReadIntoBuffer: (void*)buffer + exactLength: (size_t)length + target: (id)target + selector: (SEL)selector; + #ifdef OF_HAVE_BLOCKS /** * \brief Asyncronously reads at most size bytes from the stream into a * buffer. * @@ -499,10 +552,42 @@ * \return The line that was read, autoreleased, or nil if the end of the * stream has been reached. */ - (OFString*)readLineWithEncoding: (of_string_encoding_t)encoding; +/** + * \brief Asyncronously reads with the specified encoding until a newline, \\0 + * or end of stream occurs. + * + * \param target The target on which to call the selector when the data has + * been received. If the method returns YES, it will be called + * again when the next line has been received. If you want the + * next method in the queue to handle the next line, you need to + * return NO from the method + * \param selector The selector to call on the target. The signature must be + * BOOL (OFStream *stream, OFString *line). + */ +- (void)asyncReadLineWithTarget: (id)target + selector: (SEL)selector; + +/** + * \brief Asyncronously reads with the specified encoding until a newline, \\0 + * or end of stream occurs. + * + * \param encoding The encoding used by the stream + * \param target The target on which to call the selector when the data has + * been received. If the method returns YES, it will be called + * again when the next line has been received. If you want the + * next method in the queue to handle the next line, you need to + * return NO from the method + * \param selector The selector to call on the target. The signature must be + * BOOL (OFStream *stream, OFString *line). + */ +- (void)asyncReadLineWithEncoding: (of_string_encoding_t)encoding + target: (id)target + selector: (SEL)selector; + #ifdef OF_HAVE_BLOCKS /** * \brief Asyncronously reads until a newline, \\0 or end of stream occurs. * * \param block The block to call when the data has been received. @@ -919,11 +1004,10 @@ * \param length The length of the buffer * \return The number of bytes read */ - (size_t)lowlevelReadIntoBuffer: (void*)buffer length: (size_t)length; - /** * \brief Performs a lowlevel write. * * \warning Do not call this directly! Index: src/OFStream.m ================================================================== --- src/OFStream.m +++ src/OFStream.m @@ -141,10 +141,34 @@ while (readLength < length) readLength += [self readIntoBuffer: (char*)buffer + readLength length: length - readLength]; } + +- (void)asyncReadIntoBuffer: (void*)buffer + length: (size_t)length + target: (id)target + selector: (SEL)selector +{ + [OFRunLoop OF_addAsyncReadForStream: self + buffer: buffer + length: length + target: target + selector: selector]; +} + +- (void)asyncReadIntoBuffer: (void*)buffer + exactLength: (size_t)length + target: (id)target + selector: (SEL)selector +{ + [OFRunLoop OF_addAsyncReadForStream: self + buffer: buffer + exactLength: length + target: target + selector: selector]; +} #ifdef OF_HAVE_BLOCKS - (void)asyncReadIntoBuffer: (void*)buffer length: (size_t)length block: (of_stream_async_read_block_t)block @@ -693,10 +717,28 @@ if ([self isAtEndOfStream]) return nil; return line; } + +- (void)asyncReadLineWithTarget: (id)target + selector: (SEL)selector +{ + return [self asyncReadLineWithEncoding: OF_STRING_ENCODING_UTF_8 + target: target + selector: selector]; +} + +- (void)asyncReadLineWithEncoding: (of_string_encoding_t)encoding + target: (id)target + selector: (SEL)selector +{ + [OFRunLoop OF_addAsyncReadLineForStream: self + encoding: encoding + target: target + selector: selector]; +} #ifdef OF_HAVE_BLOCKS - (void)asyncReadLineWithBlock: (of_stream_async_read_line_block_t)block { return [self asyncReadLineWithEncoding: OF_STRING_ENCODING_UTF_8 Index: src/OFTCPSocket.h ================================================================== --- src/OFTCPSocket.h +++ src/OFTCPSocket.h @@ -171,10 +171,23 @@ * * \return An autoreleased OFTCPSocket for the accepted connection. */ - (OFTCPSocket*)accept; +/** + * \brief Asyncronously ccept an incoming connection. + * + * \param target The target on which to execute the selector when a new + * connection has been accepted. The method returns whether the + * next incoming connection should be accepted by the specified + * block as well. + * \param selector The selector to call on the target. The signature must be + * BOOL (OFTCPSocket *socket, OFTCPSocket *acceptedSocket). + */ +- (void)asyncAcceptWithTarget: (id)target + selector: (SEL)selector; + #ifdef OF_HAVE_BLOCKS /** * \brief Asyncronously ccept an incoming connection. * * \param block The block to execute when a new connection has been accepted. Index: src/OFTCPSocket.m ================================================================== --- src/OFTCPSocket.m +++ src/OFTCPSocket.m @@ -512,10 +512,18 @@ newSocket->sockAddr = addr; newSocket->sockAddrLen = addrLen; return newSocket; } + +- (void)asyncAcceptWithTarget: (id)target + selector: (SEL)selector +{ + [OFRunLoop OF_addAsyncAcceptForTCPSocket: self + target: target + selector: selector]; +} #ifdef OF_HAVE_BLOCKS - (void)asyncAcceptWithBlock: (of_tcpsocket_async_accept_block_t)block { [OFRunLoop OF_addAsyncAcceptForTCPSocket: self