Artifact a5c17b66d78245004b2ee181c03bec73b5026e5210aa91c4a53daad75f3b665e:
- File
src/OFRunLoop.m
— part of check-in
[7a8a5a2995]
at
2012-11-24 00:07:49
on branch trunk
— -[OFSortedList addObject:] -> -[insertObject:].
The rationale behind this is that otherwise, there are two methods
called addObject: with a different signature, the one from
OFMutableArray and the one from OFSortedList. As OFSortedList is
actually using insertion sort and all other methods on an OFList start
with insert anyway, this is also more consistent. (user: js, size: 12903) [annotate] [blame] [check-ins using]
/* * Copyright (c) 2008, 2009, 2010, 2011, 2012 * Jonathan Schleifer <js@webkeks.org> * * All rights reserved. * * This file is part of ObjFW. It may be distributed under the terms of the * Q Public License 1.0, which can be found in the file LICENSE.QPL included in * the packaging of this file. * * Alternatively, it may be distributed under the terms of the GNU General * Public License, either version 2 or 3, which can be found in the file * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this * file. */ #include "config.h" #import "OFRunLoop.h" #import "OFDictionary.h" #import "OFThread.h" #import "OFSortedList.h" #import "OFTimer.h" #import "OFDate.h" #import "autorelease.h" #import "macros.h" static OFRunLoop *mainRunLoop = nil; @interface OFRunLoop_ReadQueueItem: OFObject { @public void *buffer; size_t length; id target; SEL selector; #ifdef OF_HAVE_BLOCKS of_stream_async_read_block_t block; #endif } @end @interface OFRunLoop_ExactReadQueueItem: OFObject { @public void *buffer; size_t exactLength, readLength; id target; SEL selector; #ifdef OF_HAVE_BLOCKS of_stream_async_read_block_t block; #endif } @end @interface OFRunLoop_ReadLineQueueItem: OFObject { @public of_string_encoding_t encoding; id target; SEL selector; #ifdef OF_HAVE_BLOCKS of_stream_async_read_line_block_t block; #endif } @end @interface OFRunLoop_AcceptQueueItem: OFObject { @public id target; SEL selector; #ifdef OF_HAVE_BLOCKS of_tcpsocket_async_accept_block_t block; #endif } @end @implementation OFRunLoop_ReadQueueItem - (void)dealloc { [target release]; #ifdef OF_HAVE_BLOCKS [block release]; #endif [super dealloc]; } @end @implementation OFRunLoop_ExactReadQueueItem - (void)dealloc { [target release]; #ifdef OF_HAVE_BLOCKS [block release]; #endif [super dealloc]; } @end @implementation OFRunLoop_ReadLineQueueItem - (void)dealloc { [target release]; #ifdef OF_HAVE_BLOCKS [block release]; #endif [super dealloc]; } @end @implementation OFRunLoop_AcceptQueueItem - (void)dealloc { [target release]; #ifdef OF_HAVE_BLOCKS [block release]; #endif [super dealloc]; } @end @implementation OFRunLoop + (OFRunLoop*)mainRunLoop { return [[mainRunLoop retain] autorelease]; } + (OFRunLoop*)currentRunLoop { return [[OFThread currentThread] runLoop]; } + (void)OF_setMainRunLoop { void *pool = objc_autoreleasePoolPush(); mainRunLoop = [[self currentRunLoop] retain]; objc_autoreleasePoolPop(pool); } #define ADD(type, code) \ void *pool = objc_autoreleasePoolPush(); \ OFRunLoop *runLoop = [self currentRunLoop]; \ OFList *queue = [runLoop->readQueues objectForKey: stream]; \ type *queueItem; \ \ if (queue == nil) { \ queue = [OFList list]; \ [runLoop->readQueues setObject: queue \ forKey: stream]; \ } \ \ if ([queue count] == 0) \ [runLoop->streamObserver addStreamForReading: stream]; \ \ queueItem = [[[type alloc] init] autorelease]; \ code \ [queue appendObject: queueItem]; \ \ objc_autoreleasePoolPop(pool); + (void)OF_addAsyncReadForStream: (OFStream*)stream buffer: (void*)buffer length: (size_t)length target: (id)target selector: (SEL)selector { ADD(OFRunLoop_ReadQueueItem, { queueItem->buffer = buffer; queueItem->length = length; queueItem->target = [target retain]; queueItem->selector = selector; }) } + (void)OF_addAsyncReadForStream: (OFStream*)stream buffer: (void*)buffer exactLength: (size_t)exactLength target: (id)target selector: (SEL)selector { ADD(OFRunLoop_ExactReadQueueItem, { queueItem->buffer = buffer; queueItem->exactLength = exactLength; queueItem->target = [target retain]; queueItem->selector = selector; }) } + (void)OF_addAsyncReadLineForStream: (OFStream*)stream encoding: (of_string_encoding_t)encoding target: (id)target selector: (SEL)selector { ADD(OFRunLoop_ReadLineQueueItem, { queueItem->encoding = encoding; queueItem->target = [target retain]; queueItem->selector = selector; }) } + (void)OF_addAsyncAcceptForTCPSocket: (OFTCPSocket*)stream target: (id)target selector: (SEL)selector { ADD(OFRunLoop_AcceptQueueItem, { queueItem->target = [target retain]; queueItem->selector = selector; }) } #ifdef OF_HAVE_BLOCKS + (void)OF_addAsyncReadForStream: (OFStream*)stream buffer: (void*)buffer length: (size_t)length block: (of_stream_async_read_block_t)block { ADD(OFRunLoop_ReadQueueItem, { queueItem->buffer = buffer; queueItem->length = length; queueItem->block = [block copy]; }) } + (void)OF_addAsyncReadForStream: (OFStream*)stream buffer: (void*)buffer exactLength: (size_t)exactLength block: (of_stream_async_read_block_t)block { ADD(OFRunLoop_ExactReadQueueItem, { queueItem->buffer = buffer; queueItem->exactLength = exactLength; queueItem->block = [block copy]; }) } + (void)OF_addAsyncReadLineForStream: (OFStream*)stream encoding: (of_string_encoding_t)encoding block: (of_stream_async_read_line_block_t)block { ADD(OFRunLoop_ReadLineQueueItem, { queueItem->encoding = encoding; queueItem->block = [block copy]; }) } + (void)OF_addAsyncAcceptForTCPSocket: (OFTCPSocket*)stream block: (of_tcpsocket_async_accept_block_t)block { ADD(OFRunLoop_AcceptQueueItem, { queueItem->block = [block copy]; }) } #endif #undef ADD - init { self = [super init]; @try { timersQueue = [[OFSortedList alloc] init]; streamObserver = [[OFStreamObserver alloc] init]; [streamObserver setDelegate: self]; readQueues = [[OFMutableDictionary alloc] init]; } @catch (id e) { [self release]; @throw e; } return self; } - (void)dealloc { [timersQueue release]; [streamObserver release]; [readQueues release]; [super dealloc]; } - (void)addTimer: (OFTimer*)timer { @synchronized (timersQueue) { [timersQueue insertObject: timer]; } [streamObserver cancel]; } - (void)streamIsReadyForReading: (OFStream*)stream { OFList *queue = [readQueues objectForKey: stream]; of_list_object_t *listObject; OF_ENSURE(queue != nil); listObject = [queue firstListObject]; if ([listObject->object isKindOfClass: [OFRunLoop_ReadQueueItem class]]) { OFRunLoop_ReadQueueItem *queueItem = listObject->object; size_t length; OFException *exception = nil; @try { length = [stream readIntoBuffer: queueItem->buffer length: queueItem->length]; } @catch (OFException *e) { length = 0; exception = e; } #ifdef OF_HAVE_BLOCKS if (queueItem->block != NULL) { if (!queueItem->block(stream, queueItem->buffer, length, exception)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [streamObserver removeStreamForReading: stream]; [readQueues removeObjectForKey: stream]; } } } else { #endif BOOL (*func)(id, SEL, OFStream*, void*, size_t, OFException*) = (BOOL(*)(id, SEL, OFStream*, void*, size_t, OFException*)) [queueItem->target methodForSelector: queueItem->selector]; if (!func(queueItem->target, queueItem->selector, stream, queueItem->buffer, length, exception)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [streamObserver removeStreamForReading: stream]; [readQueues removeObjectForKey: stream]; } } #ifdef OF_HAVE_BLOCKS } #endif } else if ([listObject->object isKindOfClass: [OFRunLoop_ExactReadQueueItem class]]) { OFRunLoop_ExactReadQueueItem *queueItem = listObject->object; size_t length; OFException *exception = nil; @try { length = [stream readIntoBuffer: (char*)queueItem->buffer + queueItem->readLength length: queueItem->exactLength - queueItem->readLength]; } @catch (OFException *e) { length = 0; exception = e; } queueItem->readLength += length; if (queueItem->readLength == queueItem->exactLength || [stream isAtEndOfStream] || exception != nil) { #ifdef OF_HAVE_BLOCKS if (queueItem->block != NULL) { if (queueItem->block(stream, queueItem->buffer, queueItem->readLength, exception)) queueItem->readLength = 0; else { [queue removeListObject: listObject]; if ([queue count] == 0) { [streamObserver removeStreamForReading: stream]; [readQueues removeObjectForKey: stream]; } } } else { #endif BOOL (*func)(id, SEL, OFStream*, void*, size_t, OFException*) = (BOOL(*)(id, SEL, OFStream*, void*, size_t, OFException*)) [queueItem->target methodForSelector: queueItem->selector]; if (func(queueItem->target, queueItem->selector, stream, queueItem->buffer, queueItem->readLength, exception)) queueItem->readLength = 0; else { [queue removeListObject: listObject]; if ([queue count] == 0) { [streamObserver removeStreamForReading: stream]; [readQueues removeObjectForKey: stream]; } } #ifdef OF_HAVE_BLOCKS } #endif } } else if ([listObject->object isKindOfClass: [OFRunLoop_ReadLineQueueItem class]]) { OFRunLoop_ReadLineQueueItem *queueItem = listObject->object; OFString *line; OFException *exception = nil; @try { line = [stream tryReadLineWithEncoding: queueItem->encoding]; } @catch (OFException *e) { line = nil; exception = e; } if (line != nil || [stream isAtEndOfStream] || exception != nil) { #ifdef OF_HAVE_BLOCKS if (queueItem->block != NULL) { if (!queueItem->block(stream, line, exception)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [streamObserver removeStreamForReading: stream]; [readQueues removeObjectForKey: stream]; } } } else { #endif BOOL (*func)(id, SEL, OFStream*, OFString*, OFException*) = (BOOL(*)(id, SEL, OFStream*, OFString*, OFException*)) [queueItem->target methodForSelector: queueItem->selector]; if (!func(queueItem->target, queueItem->selector, stream, line, exception)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [streamObserver removeStreamForReading: stream]; [readQueues removeObjectForKey: stream]; } } #ifdef OF_HAVE_BLOCKS } #endif } } else if ([listObject->object isKindOfClass: [OFRunLoop_AcceptQueueItem class]]) { OFRunLoop_AcceptQueueItem *queueItem = listObject->object; OFTCPSocket *newSocket; OFException *exception = nil; @try { newSocket = [(OFTCPSocket*)stream accept]; } @catch (OFException *e) { newSocket = nil; exception = e; } #ifdef OF_HAVE_BLOCKS if (queueItem->block != NULL) { if (!queueItem->block((OFTCPSocket*)stream, newSocket, exception)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [streamObserver removeStreamForReading: stream]; [readQueues removeObjectForKey: stream]; } } } else { #endif BOOL (*func)(id, SEL, OFTCPSocket*, OFTCPSocket*, OFException*) = (BOOL(*)(id, SEL, OFTCPSocket*, OFTCPSocket*, OFException*)) [queueItem->target methodForSelector: queueItem->selector]; if (!func(queueItem->target, queueItem->selector, (OFTCPSocket*)stream, newSocket, exception)) { [queue removeListObject: listObject]; if ([queue count] == 0) { [streamObserver removeStreamForReading: stream]; [readQueues removeObjectForKey: stream]; } } #ifdef OF_HAVE_BLOCKS } #endif } else OF_ENSURE(0); } - (void)run { for (;;) { void *pool = objc_autoreleasePoolPush(); OFDate *now = [OFDate date]; OFTimer *timer; OFDate *nextTimer; @synchronized (timersQueue) { of_list_object_t *listObject = [timersQueue firstListObject]; if (listObject != NULL && [[listObject->object fireDate] compare: now] != OF_ORDERED_DESCENDING) { timer = [[listObject->object retain] autorelease]; [timersQueue removeListObject: listObject]; } else timer = nil; } if ([timer isValid]) [timer fire]; @synchronized (timersQueue) { nextTimer = [[timersQueue firstObject] fireDate]; } /* Watch for stream events until the next timer is due */ if (nextTimer != nil) { double timeout = [nextTimer timeIntervalSinceNow]; if (timeout > 0) [streamObserver observeWithTimeout: timeout]; } else { /* * No more timers: Just watch for streams until we get * an event. If a timer is added by another thread, it * cancels the observe. */ [streamObserver observe]; } objc_autoreleasePoolPop(pool); } } @end