Index: src/OFRunLoop.h ================================================================== --- src/OFRunLoop.h +++ src/OFRunLoop.h @@ -13,22 +13,28 @@ * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this * file. */ #import "OFObject.h" +#import "OFStream.h" +#import "OFStreamObserver.h" @class OFSortedList; -@class OFStreamObserver; @class OFTimer; +@class OFMutableDictionary; /** * \brief A class providing a run loop for the application and its processes. */ @interface OFRunLoop: OFObject +#ifdef OF_RUNLOOP_M + +#endif { OFSortedList *timersQueue; OFStreamObserver *streamObserver; + OFMutableDictionary *readQueues; } /** * \brief Returns the main run loop. * @@ -42,10 +48,19 @@ * \return The run loop for the current thread */ + (OFRunLoop*)currentRunLoop; + (void)_setMainRunLoop: (OFRunLoop*)mainRunLoop; +#ifdef OF_HAVE_BLOCKS ++ (void)_addAsyncReadForStream: (OFStream*)stream + buffer: (void*)buffer + length: (size_t)length + block: (of_stream_async_read_block_t)block; ++ (void)_addAsyncReadLineForStream: (OFStream*)stream + encoding: (of_string_encoding_t)encoding + block: (of_stream_async_read_line_block_t)block; +#endif /** * \brief Adds an OFTimer to the run loop. * * \param timer The timer to add Index: src/OFRunLoop.m ================================================================== --- src/OFRunLoop.m +++ src/OFRunLoop.m @@ -13,20 +13,70 @@ * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this * file. */ #include "config.h" + +#define OF_RUNLOOP_M #import "OFRunLoop.h" +#import "OFDictionary.h" #import "OFThread.h" #import "OFSortedList.h" #import "OFTimer.h" #import "OFDate.h" -#import "OFStreamObserver.h" + +#import "macros.h" static OFTLSKey *currentRunLoopKey; static OFRunLoop *mainRunLoop; + +#ifdef OF_HAVE_BLOCKS +@interface OFRunLoop_ReadQueueItem: OFObject +{ + void *buffer; + size_t length; + of_stream_async_read_block_t block; +} + +@property void *buffer; +@property size_t length; +@property (copy) of_stream_async_read_block_t block; +@end + +@interface OFRunLoop_ReadLineQueueItem: OFObject +{ + of_stream_async_read_line_block_t block; + of_string_encoding_t encoding; +} + +@property (copy) of_stream_async_read_line_block_t block; +@property of_string_encoding_t encoding; +@end + +@implementation OFRunLoop_ReadQueueItem +@synthesize buffer, length, block; + +- (void)dealloc +{ + [block release]; + + [super dealloc]; +} +@end + +@implementation OFRunLoop_ReadLineQueueItem +@synthesize block, encoding; + +- (void)dealloc +{ + [block release]; + + [super dealloc]; +} +@end +#endif @implementation OFRunLoop + (void)initialize { if (self == [OFRunLoop class]) @@ -47,19 +97,79 @@ + (void)_setMainRunLoop: (OFRunLoop*)mainRunLoop_ { mainRunLoop = [mainRunLoop_ retain]; } +#ifdef OF_HAVE_BLOCKS ++ (void)_addAsyncReadForStream: (OFStream*)stream + buffer: (void*)buffer + length: (size_t)length + block: (of_stream_async_read_block_t)block +{ + void *pool = objc_autoreleasePoolPush(); + OFRunLoop *runLoop = [self currentRunLoop]; + OFList *queue = [runLoop->readQueues objectForKey: stream]; + OFRunLoop_ReadQueueItem *queueItem; + + if (queue == nil) { + queue = [OFList list]; + [runLoop->readQueues setObject: queue + forKey: stream]; + } + + if ([queue count] == 0) + [runLoop->streamObserver addStreamForReading: stream]; + + queueItem = [[[OFRunLoop_ReadQueueItem alloc] init] autorelease]; + [queueItem setBuffer: buffer]; + [queueItem setLength: length]; + [queueItem setBlock: block]; + [queue appendObject: queueItem]; + + objc_autoreleasePoolPop(pool); +} + ++ (void)_addAsyncReadLineForStream: (OFStream*)stream + encoding: (of_string_encoding_t)encoding + block: (of_stream_async_read_line_block_t)block +{ + void *pool = objc_autoreleasePoolPush(); + OFRunLoop *runLoop = [self currentRunLoop]; + OFList *queue = [runLoop->readQueues objectForKey: stream]; + OFRunLoop_ReadLineQueueItem *queueItem; + + if (queue == nil) { + queue = [OFList list]; + [runLoop->readQueues setObject: queue + forKey: stream]; + } + + if ([queue count] == 0) + [runLoop->streamObserver addStreamForReading: stream]; + + queueItem = [[[OFRunLoop_ReadLineQueueItem alloc] init] autorelease]; + [queueItem setBlock: block]; + [queueItem setEncoding: encoding]; + [queue appendObject: queueItem]; + + objc_autoreleasePoolPop(pool); +} +#endif + - init { self = [super init]; @try { void *pool = objc_autoreleasePoolPush(); timersQueue = [[[OFThread currentThread] _timersQueue] retain]; + streamObserver = [[OFStreamObserver alloc] init]; + [streamObserver setDelegate: self]; + + readQueues = [[OFMutableDictionary alloc] init]; [OFThread setObject: self forTLSKey: currentRunLoopKey]; objc_autoreleasePoolPop(pool); @@ -73,10 +183,11 @@ - (void)dealloc { [timersQueue release]; [streamObserver release]; + [readQueues release]; [super dealloc]; } - (void)addTimer: (OFTimer*)timer @@ -84,10 +195,54 @@ @synchronized (timersQueue) { [timersQueue addObject: timer]; } [streamObserver cancel]; } + +#ifdef OF_HAVE_BLOCKS +- (void)streamIsReadyForReading: (OFStream*)stream +{ + OFList *queue = [readQueues objectForKey: stream]; + of_list_object_t *listObject; + + OF_ENSURE(queue != nil); + + listObject = [queue firstListObject]; + + if ([listObject->object isKindOfClass: + [OFRunLoop_ReadQueueItem class]]) { + OFRunLoop_ReadQueueItem *queueItem = listObject->object; + void *buffer = [queueItem buffer]; + size_t length = [stream readIntoBuffer: buffer + length: [queueItem length]]; + + if (![queueItem block](stream, buffer, length)) { + [queue removeListObject: listObject]; + + if ([queue count] == 0) + [streamObserver removeStreamForReading: stream]; + } + } else if ([listObject->object isKindOfClass: + [OFRunLoop_ReadLineQueueItem class]]) { + OFRunLoop_ReadLineQueueItem *queueItem = listObject->object; + OFString *line; + + line = [stream tryReadLineWithEncoding: [queueItem encoding]]; + + if (line != nil || [stream isAtEndOfStream]) { + if (![queueItem block](stream, line)) { + [queue removeListObject: listObject]; + + if ([queue count] == 0) + [streamObserver + removeStreamForReading: stream]; + } + } + } else + OF_ENSURE(0); +} +#endif - (void)run { for (;;) { void *pool = objc_autoreleasePoolPush(); Index: src/OFStream.h ================================================================== --- src/OFStream.h +++ src/OFStream.h @@ -24,11 +24,17 @@ #include #import "OFObject.h" #import "OFString.h" +@class OFStream; @class OFDataArray; + +#ifdef OF_HAVE_BLOCKS +typedef BOOL (^of_stream_async_read_block_t)(OFStream*, void*, size_t); +typedef BOOL (^of_stream_async_read_line_block_t)(OFStream*, OFString*); +#endif /** * \brief A base class for different types of streams. * * \warning Even though the OFCopying protocol is implemented, it does @@ -79,10 +85,34 @@ * The buffer must be at least this big! * \return The number of bytes read */ - (size_t)readIntoBuffer: (void*)buffer length: (size_t)size; + +#ifdef OF_HAVE_BLOCKS +/** + * \brief Asyncronously reads at most size bytes from the stream into a + * buffer. + * + * On network streams, this might read less than the specified number of bytes. + * If you want to read exactly the specified number of bytes, use + * -[readIntoBuffer:exactLength:]. + * + * \param buffer The buffer into which the data is read. + * The buffer must not be free'd before the async read completed! + * \param length The length of the data that should be read at most. + * The buffer must be at least this big! + * \param block The block to call when the data has been received. + * If the block returns YES, it will be called again with the same + * buffer and maximum length when more data has been received. If + * you want the next block in the queue to handle the data + * received next, you need to return NO from the block. + */ +- (void)asyncReadWithBuffer: (void*)buffer + length: (size_t)length + block: (of_stream_async_read_block_t)block; +#endif /** * \brief Reads exactly the specified length bytes from the stream into a * buffer. * @@ -443,10 +473,35 @@ * \param encoding The encoding used by the stream * \return The line that was read, autoreleased, or nil if the end of the * stream has been reached. */ - (OFString*)readLineWithEncoding: (of_string_encoding_t)encoding; + +#ifdef OF_HAVE_BLOCKS +/** + * \brief Asyncronously reads until a newline, \\0 or end of stream occurs. + * + * \param block The block to call when the data has been received. + * If the block returns YES, it will be called again when the next + * line has been received. If you want the next block in the queue + * to handle the next line, you need to return NO from the block. + */ +- (void)asyncReadLineWithBlock: (of_stream_async_read_line_block_t)block; + +/** + * \brief Asyncronously reads with the specified encoding until a newline, \\0 + * or end of stream occurs. + * + * \param encoding The encoding used by the stream + * \param block The block to call when the data has been received. + * If the block returns YES, it will be called again when the next + * line has been received. If you want the next block in the queue + * to handle the next line, you need to return NO from the block. + */ +- (void)asyncReadLineWithEncoding: (of_string_encoding_t)encoding + block: (of_stream_async_read_line_block_t)block; +#endif /** * \brief Tries to read a line from the stream (see readLine) and returns nil if * no complete line has been received yet. * Index: src/OFStream.m ================================================================== --- src/OFStream.m +++ src/OFStream.m @@ -32,10 +32,11 @@ #endif #import "OFStream.h" #import "OFString.h" #import "OFDataArray.h" +#import "OFRunLoop.h" #import "OFInvalidArgumentException.h" #import "OFInvalidFormatException.h" #import "OFNotImplementedException.h" #import "OFSetOptionFailedException.h" @@ -130,10 +131,20 @@ cacheLength -= length; return length; } } + +- (void)asyncReadWithBuffer: (void*)buffer + length: (size_t)length + block: (of_stream_async_read_block_t)block +{ + [OFRunLoop _addAsyncReadForStream: self + buffer: buffer + length: length + block: block]; +} - (void)readIntoBuffer: (void*)buffer exactLength: (size_t)length { size_t readLength = 0; @@ -670,10 +681,26 @@ if ([self isAtEndOfStream]) return nil; return line; } + +#ifdef OF_HAVE_BLOCKS +- (void)asyncReadLineWithBlock: (of_stream_async_read_line_block_t)block +{ + return [self asyncReadLineWithEncoding: OF_STRING_ENCODING_UTF_8 + block: block]; +} + +- (void)asyncReadLineWithEncoding: (of_string_encoding_t)encoding + block: (of_stream_async_read_line_block_t)block +{ + [OFRunLoop _addAsyncReadLineForStream: self + encoding: encoding + block: block]; +} +#endif - (OFString*)tryReadLine { return [self tryReadLineWithEncoding: OF_STRING_ENCODING_UTF_8]; }