@@ -13,20 +13,70 @@ * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this * file. */ #include "config.h" + +#define OF_RUNLOOP_M #import "OFRunLoop.h" +#import "OFDictionary.h" #import "OFThread.h" #import "OFSortedList.h" #import "OFTimer.h" #import "OFDate.h" -#import "OFStreamObserver.h" + +#import "macros.h" static OFTLSKey *currentRunLoopKey; static OFRunLoop *mainRunLoop; + +#ifdef OF_HAVE_BLOCKS +@interface OFRunLoop_ReadQueueItem: OFObject +{ + void *buffer; + size_t length; + of_stream_async_read_block_t block; +} + +@property void *buffer; +@property size_t length; +@property (copy) of_stream_async_read_block_t block; +@end + +@interface OFRunLoop_ReadLineQueueItem: OFObject +{ + of_stream_async_read_line_block_t block; + of_string_encoding_t encoding; +} + +@property (copy) of_stream_async_read_line_block_t block; +@property of_string_encoding_t encoding; +@end + +@implementation OFRunLoop_ReadQueueItem +@synthesize buffer, length, block; + +- (void)dealloc +{ + [block release]; + + [super dealloc]; +} +@end + +@implementation OFRunLoop_ReadLineQueueItem +@synthesize block, encoding; + +- (void)dealloc +{ + [block release]; + + [super dealloc]; +} +@end +#endif @implementation OFRunLoop + (void)initialize { if (self == [OFRunLoop class]) @@ -47,19 +97,79 @@ + (void)_setMainRunLoop: (OFRunLoop*)mainRunLoop_ { mainRunLoop = [mainRunLoop_ retain]; } +#ifdef OF_HAVE_BLOCKS ++ (void)_addAsyncReadForStream: (OFStream*)stream + buffer: (void*)buffer + length: (size_t)length + block: (of_stream_async_read_block_t)block +{ + void *pool = objc_autoreleasePoolPush(); + OFRunLoop *runLoop = [self currentRunLoop]; + OFList *queue = [runLoop->readQueues objectForKey: stream]; + OFRunLoop_ReadQueueItem *queueItem; + + if (queue == nil) { + queue = [OFList list]; + [runLoop->readQueues setObject: queue + forKey: stream]; + } + + if ([queue count] == 0) + [runLoop->streamObserver addStreamForReading: stream]; + + queueItem = [[[OFRunLoop_ReadQueueItem alloc] init] autorelease]; + [queueItem setBuffer: buffer]; + [queueItem setLength: length]; + [queueItem setBlock: block]; + [queue appendObject: queueItem]; + + objc_autoreleasePoolPop(pool); +} + ++ (void)_addAsyncReadLineForStream: (OFStream*)stream + encoding: (of_string_encoding_t)encoding + block: (of_stream_async_read_line_block_t)block +{ + void *pool = objc_autoreleasePoolPush(); + OFRunLoop *runLoop = [self currentRunLoop]; + OFList *queue = [runLoop->readQueues objectForKey: stream]; + OFRunLoop_ReadLineQueueItem *queueItem; + + if (queue == nil) { + queue = [OFList list]; + [runLoop->readQueues setObject: queue + forKey: stream]; + } + + if ([queue count] == 0) + [runLoop->streamObserver addStreamForReading: stream]; + + queueItem = [[[OFRunLoop_ReadLineQueueItem alloc] init] autorelease]; + [queueItem setBlock: block]; + [queueItem setEncoding: encoding]; + [queue appendObject: queueItem]; + + objc_autoreleasePoolPop(pool); +} +#endif + - init { self = [super init]; @try { void *pool = objc_autoreleasePoolPush(); timersQueue = [[[OFThread currentThread] _timersQueue] retain]; + streamObserver = [[OFStreamObserver alloc] init]; + [streamObserver setDelegate: self]; + + readQueues = [[OFMutableDictionary alloc] init]; [OFThread setObject: self forTLSKey: currentRunLoopKey]; objc_autoreleasePoolPop(pool); @@ -73,10 +183,11 @@ - (void)dealloc { [timersQueue release]; [streamObserver release]; + [readQueues release]; [super dealloc]; } - (void)addTimer: (OFTimer*)timer @@ -84,10 +195,54 @@ @synchronized (timersQueue) { [timersQueue addObject: timer]; } [streamObserver cancel]; } + +#ifdef OF_HAVE_BLOCKS +- (void)streamIsReadyForReading: (OFStream*)stream +{ + OFList *queue = [readQueues objectForKey: stream]; + of_list_object_t *listObject; + + OF_ENSURE(queue != nil); + + listObject = [queue firstListObject]; + + if ([listObject->object isKindOfClass: + [OFRunLoop_ReadQueueItem class]]) { + OFRunLoop_ReadQueueItem *queueItem = listObject->object; + void *buffer = [queueItem buffer]; + size_t length = [stream readIntoBuffer: buffer + length: [queueItem length]]; + + if (![queueItem block](stream, buffer, length)) { + [queue removeListObject: listObject]; + + if ([queue count] == 0) + [streamObserver removeStreamForReading: stream]; + } + } else if ([listObject->object isKindOfClass: + [OFRunLoop_ReadLineQueueItem class]]) { + OFRunLoop_ReadLineQueueItem *queueItem = listObject->object; + OFString *line; + + line = [stream tryReadLineWithEncoding: [queueItem encoding]]; + + if (line != nil || [stream isAtEndOfStream]) { + if (![queueItem block](stream, line)) { + [queue removeListObject: listObject]; + + if ([queue count] == 0) + [streamObserver + removeStreamForReading: stream]; + } + } + } else + OF_ENSURE(0); +} +#endif - (void)run { for (;;) { void *pool = objc_autoreleasePoolPush();