ObjFW  OFRunLoop.m at [3c6ad38d92]

File src/OFRunLoop.m artifact 9ee569b5cb part of check-in 3c6ad38d92


/*
 * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013
 *   Jonathan Schleifer <js@webkeks.org>
 *
 * All rights reserved.
 *
 * This file is part of ObjFW. It may be distributed under the terms of the
 * Q Public License 1.0, which can be found in the file LICENSE.QPL included in
 * the packaging of this file.
 *
 * Alternatively, it may be distributed under the terms of the GNU General
 * Public License, either version 2 or 3, which can be found in the file
 * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this
 * file.
 */

#include "config.h"

#include <assert.h>

#import "OFRunLoop.h"
#import "OFDictionary.h"
#ifdef OF_HAVE_THREADS
# import "OFThread.h"
# import "OFMutex.h"
#endif
#import "OFSortedList.h"
#import "OFTimer.h"
#import "OFDate.h"

#import "autorelease.h"
#import "macros.h"

static OFRunLoop *mainRunLoop = nil;

@interface OFRunLoop_ReadQueueItem: OFObject
{
@public
	void *buffer;
	size_t length;
	id target;
	SEL selector;
#ifdef OF_HAVE_BLOCKS
	of_stream_async_read_block_t block;
#endif
}
@end

@interface OFRunLoop_ExactReadQueueItem: OFObject
{
@public
	void *buffer;
	size_t exactLength, readLength;
	id target;
	SEL selector;
#ifdef OF_HAVE_BLOCKS
	of_stream_async_read_block_t block;
#endif
}
@end

@interface OFRunLoop_ReadLineQueueItem: OFObject
{
@public
	of_string_encoding_t encoding;
	id target;
	SEL selector;
#ifdef OF_HAVE_BLOCKS
	of_stream_async_read_line_block_t block;
#endif
}
@end

@interface OFRunLoop_AcceptQueueItem: OFObject
{
@public
	id target;
	SEL selector;
#ifdef OF_HAVE_BLOCKS
	of_tcpsocket_async_accept_block_t block;
#endif
}
@end

@implementation OFRunLoop_ReadQueueItem
- (void)dealloc
{
	[target release];
#ifdef OF_HAVE_BLOCKS
	[block release];
#endif

	[super dealloc];
}
@end

@implementation OFRunLoop_ExactReadQueueItem
- (void)dealloc
{
	[target release];
#ifdef OF_HAVE_BLOCKS
	[block release];
#endif

	[super dealloc];
}
@end

@implementation OFRunLoop_ReadLineQueueItem
- (void)dealloc
{
	[target release];
#ifdef OF_HAVE_BLOCKS
	[block release];
#endif

	[super dealloc];
}
@end

@implementation OFRunLoop_AcceptQueueItem
- (void)dealloc
{
	[target release];
#ifdef OF_HAVE_BLOCKS
	[block release];
#endif

	[super dealloc];
}
@end

@implementation OFRunLoop
+ (OFRunLoop*)mainRunLoop
{
	return [[mainRunLoop retain] autorelease];
}

+ (OFRunLoop*)currentRunLoop
{
#ifdef OF_HAVE_THREADS
	return [[OFThread currentThread] runLoop];
#else
	return [self mainRunLoop];
#endif
}

+ (void)OF_setMainRunLoop: (OFRunLoop*)runLoop
{
	mainRunLoop = [runLoop retain];
}

#define ADD(type, code)							\
	void *pool = objc_autoreleasePoolPush();			\
	OFRunLoop *runLoop = [self currentRunLoop];			\
	OFList *queue = [runLoop->readQueues objectForKey: stream];	\
	type *queueItem;						\
									\
	if (queue == nil) {						\
		queue = [OFList list];					\
		[runLoop->readQueues setObject: queue			\
					forKey: stream];		\
	}								\
									\
	if ([queue count] == 0)						\
		[runLoop->streamObserver addStreamForReading: stream];	\
									\
	queueItem = [[[type alloc] init] autorelease];			\
	code								\
	[queue appendObject: queueItem];				\
									\
	objc_autoreleasePoolPop(pool);

+ (void)OF_addAsyncReadForStream: (OFStream*)stream
			  buffer: (void*)buffer
			  length: (size_t)length
			  target: (id)target
			selector: (SEL)selector
{
	ADD(OFRunLoop_ReadQueueItem, {
		queueItem->buffer = buffer;
		queueItem->length = length;
		queueItem->target = [target retain];
		queueItem->selector = selector;
	})
}

+ (void)OF_addAsyncReadForStream: (OFStream*)stream
			  buffer: (void*)buffer
		     exactLength: (size_t)exactLength
			  target: (id)target
			selector: (SEL)selector
{
	ADD(OFRunLoop_ExactReadQueueItem, {
		queueItem->buffer = buffer;
		queueItem->exactLength = exactLength;
		queueItem->target = [target retain];
		queueItem->selector = selector;
	})
}

+ (void)OF_addAsyncReadLineForStream: (OFStream*)stream
			    encoding: (of_string_encoding_t)encoding
			      target: (id)target
			    selector: (SEL)selector
{
	ADD(OFRunLoop_ReadLineQueueItem, {
		queueItem->encoding = encoding;
		queueItem->target = [target retain];
		queueItem->selector = selector;
	})
}

+ (void)OF_addAsyncAcceptForTCPSocket: (OFTCPSocket*)stream
			       target: (id)target
			     selector: (SEL)selector
{
	ADD(OFRunLoop_AcceptQueueItem, {
		queueItem->target = [target retain];
		queueItem->selector = selector;
	})
}

#ifdef OF_HAVE_BLOCKS
+ (void)OF_addAsyncReadForStream: (OFStream*)stream
			  buffer: (void*)buffer
			  length: (size_t)length
			   block: (of_stream_async_read_block_t)block
{
	ADD(OFRunLoop_ReadQueueItem, {
		queueItem->buffer = buffer;
		queueItem->length = length;
		queueItem->block = [block copy];
	})
}

+ (void)OF_addAsyncReadForStream: (OFStream*)stream
			  buffer: (void*)buffer
		     exactLength: (size_t)exactLength
			   block: (of_stream_async_read_block_t)block
{
	ADD(OFRunLoop_ExactReadQueueItem, {
		queueItem->buffer = buffer;
		queueItem->exactLength = exactLength;
		queueItem->block = [block copy];
	})
}

+ (void)OF_addAsyncReadLineForStream: (OFStream*)stream
			    encoding: (of_string_encoding_t)encoding
			       block: (of_stream_async_read_line_block_t)block
{
	ADD(OFRunLoop_ReadLineQueueItem, {
		queueItem->encoding = encoding;
		queueItem->block = [block copy];
	})
}

+ (void)OF_addAsyncAcceptForTCPSocket: (OFTCPSocket*)stream
				block: (of_tcpsocket_async_accept_block_t)block
{
	ADD(OFRunLoop_AcceptQueueItem, {
		queueItem->block = [block copy];
	})
}
#endif

#undef ADD

+ (void)OF_cancelAsyncRequestsForStream: (OFStream*)stream
{
	void *pool = objc_autoreleasePoolPush();
	OFRunLoop *runLoop = [self currentRunLoop];
	OFList *queue;

	if ((queue = [runLoop->readQueues objectForKey: stream]) != nil) {
		assert([queue count] > 0);

		[runLoop->streamObserver removeStreamForReading: stream];
		[runLoop->readQueues removeObjectForKey: stream];
	}

	objc_autoreleasePoolPop(pool);
}

- init
{
	self = [super init];

	@try {
		timersQueue = [[OFSortedList alloc] init];
#ifdef OF_HAVE_THREADS
		timersQueueLock = [[OFMutex alloc] init];
#endif

		streamObserver = [[OFStreamObserver alloc] init];
		[streamObserver setDelegate: self];

		readQueues = [[OFMutableDictionary alloc] init];
	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}

- (void)dealloc
{
	[timersQueue release];
#ifdef OF_HAVE_THREADS
	[timersQueueLock release];
#endif
	[streamObserver release];
	[readQueues release];

	[super dealloc];
}

- (void)addTimer: (OFTimer*)timer
{
#ifdef OF_HAVE_THREADS
	[timersQueueLock lock];
	@try {
#endif
		[timersQueue insertObject: timer];
#ifdef OF_HAVE_THREADS
	} @finally {
		[timersQueueLock unlock];
	}
#endif

	[timer OF_setInRunLoop: self];

	[streamObserver cancel];
}

- (void)OF_removeTimer: (OFTimer*)timer
{
#ifdef OF_HAVE_THREADS
	[timersQueueLock lock];
	@try {
#endif
		of_list_object_t *iter;

		for (iter = [timersQueue firstListObject]; iter != NULL;
		    iter = iter->next) {
			if ([iter->object isEqual: timer]) {
				[timersQueue removeListObject: iter];
				break;
			}
		}
#ifdef OF_HAVE_THREADS
	} @finally {
		[timersQueueLock unlock];
	}
#endif
}

- (void)streamIsReadyForReading: (OFStream*)stream
{
	OFList *queue = [readQueues objectForKey: stream];
	of_list_object_t *listObject;

	OF_ENSURE(queue != nil);

	listObject = [queue firstListObject];

	if ([listObject->object isKindOfClass:
	    [OFRunLoop_ReadQueueItem class]]) {
		OFRunLoop_ReadQueueItem *queueItem = listObject->object;
		size_t length;
		OFException *exception = nil;

		@try {
			length = [stream readIntoBuffer: queueItem->buffer
						 length: queueItem->length];
		} @catch (OFException *e) {
			length = 0;
			exception = e;
		}

#ifdef OF_HAVE_BLOCKS
		if (queueItem->block != NULL) {
			if (!queueItem->block(stream, queueItem->buffer,
			    length, exception)) {
				[queue removeListObject: listObject];

				if ([queue count] == 0) {
					[streamObserver
					    removeStreamForReading: stream];
					[readQueues removeObjectForKey: stream];
				}
			}
		} else {
#endif
			BOOL (*func)(id, SEL, OFStream*, void*, size_t,
			    OFException*) = (BOOL(*)(id, SEL, OFStream*, void*,
			    size_t, OFException*))
			    [queueItem->target methodForSelector:
			    queueItem->selector];

			if (!func(queueItem->target, queueItem->selector,
			    stream, queueItem->buffer, length, exception)) {
				[queue removeListObject: listObject];

				if ([queue count] == 0) {
					[streamObserver
					    removeStreamForReading: stream];
					[readQueues removeObjectForKey: stream];
				}
			}
#ifdef OF_HAVE_BLOCKS
		}
#endif
	} else if ([listObject->object isKindOfClass:
	    [OFRunLoop_ExactReadQueueItem class]]) {
		OFRunLoop_ExactReadQueueItem *queueItem = listObject->object;
		size_t length;
		OFException *exception = nil;

		@try {
			length = [stream
			    readIntoBuffer: (char*)queueItem->buffer +
					    queueItem->readLength
				    length: queueItem->exactLength -
					    queueItem->readLength];
		} @catch (OFException *e) {
			length = 0;
			exception = e;
		}

		queueItem->readLength += length;
		if (queueItem->readLength == queueItem->exactLength ||
		    [stream isAtEndOfStream] || exception != nil) {
#ifdef OF_HAVE_BLOCKS
			if (queueItem->block != NULL) {
				if (queueItem->block(stream, queueItem->buffer,
				    queueItem->readLength, exception))
					queueItem->readLength = 0;
				else {
					[queue removeListObject: listObject];

					if ([queue count] == 0) {
						[streamObserver
						    removeStreamForReading:
						    stream];
						[readQueues
						    removeObjectForKey: stream];
					}
				}
			} else {
#endif
				BOOL (*func)(id, SEL, OFStream*, void*,
				    size_t, OFException*) = (BOOL(*)(id, SEL,
				    OFStream*, void*, size_t, OFException*))
				    [queueItem->target
				    methodForSelector: queueItem->selector];

				if (func(queueItem->target,
				    queueItem->selector, stream,
				    queueItem->buffer, queueItem->readLength,
				    exception))
					queueItem->readLength = 0;
				else {
					[queue removeListObject: listObject];

					if ([queue count] == 0) {
						[streamObserver
						    removeStreamForReading:
						    stream];
						[readQueues
						    removeObjectForKey: stream];
					}
				}
#ifdef OF_HAVE_BLOCKS
			}
#endif
		}
	} else if ([listObject->object isKindOfClass:
	    [OFRunLoop_ReadLineQueueItem class]]) {
		OFRunLoop_ReadLineQueueItem *queueItem = listObject->object;
		OFString *line;
		OFException *exception = nil;

		@try {
			line = [stream
			    tryReadLineWithEncoding: queueItem->encoding];
		} @catch (OFException *e) {
			line = nil;
			exception = e;
		}

		if (line != nil || [stream isAtEndOfStream] ||
		    exception != nil) {
#ifdef OF_HAVE_BLOCKS
			if (queueItem->block != NULL) {
				if (!queueItem->block(stream, line,
				    exception)) {
					[queue removeListObject: listObject];

					if ([queue count] == 0) {
						[streamObserver
						    removeStreamForReading:
						    stream];
						[readQueues
						    removeObjectForKey: stream];
					}
				}
			} else {
#endif
				BOOL (*func)(id, SEL, OFStream*, OFString*,
				    OFException*) = (BOOL(*)(id, SEL, OFStream*,
				    OFString*, OFException*))
				    [queueItem->target methodForSelector:
				    queueItem->selector];

				if (!func(queueItem->target,
				    queueItem->selector, stream, line,
				    exception)) {
					[queue removeListObject: listObject];

					if ([queue count] == 0) {
						[streamObserver
						    removeStreamForReading:
						    stream];
						[readQueues
						    removeObjectForKey: stream];
					}
				}
#ifdef OF_HAVE_BLOCKS
			}
#endif
		}
	} else if ([listObject->object isKindOfClass:
	    [OFRunLoop_AcceptQueueItem class]]) {
		OFRunLoop_AcceptQueueItem *queueItem = listObject->object;
		OFTCPSocket *newSocket;
		OFException *exception = nil;

		@try {
			newSocket = [(OFTCPSocket*)stream accept];
		} @catch (OFException *e) {
			newSocket = nil;
			exception = e;
		}

#ifdef OF_HAVE_BLOCKS
		if (queueItem->block != NULL) {
			if (!queueItem->block((OFTCPSocket*)stream,
			    newSocket, exception)) {
				[queue removeListObject: listObject];

				if ([queue count] == 0) {
					[streamObserver
					    removeStreamForReading: stream];
					[readQueues removeObjectForKey: stream];
				}
			}
		} else {
#endif
			BOOL (*func)(id, SEL, OFTCPSocket*, OFTCPSocket*,
			    OFException*) =
			    (BOOL(*)(id, SEL, OFTCPSocket*, OFTCPSocket*,
			    OFException*))
			    [queueItem->target methodForSelector:
			    queueItem->selector];

			if (!func(queueItem->target, queueItem->selector,
			    (OFTCPSocket*)stream, newSocket, exception)) {
				[queue removeListObject: listObject];

				if ([queue count] == 0) {
					[streamObserver
					    removeStreamForReading: stream];
					[readQueues removeObjectForKey: stream];
				}
			}
#ifdef OF_HAVE_BLOCKS
		}
#endif
	} else
		OF_ENSURE(0);
}

- (void)run
{
	running = YES;

	while (running) {
		void *pool = objc_autoreleasePoolPush();
		OFDate *now = [OFDate date];
		OFTimer *timer;
		OFDate *nextTimer;

#ifdef OF_HAVE_THREADS
		[timersQueueLock lock];
		@try {
#endif
			of_list_object_t *listObject =
			    [timersQueue firstListObject];

			if (listObject != NULL &&
			    [[listObject->object fireDate] compare: now] !=
			    OF_ORDERED_DESCENDING) {
				timer =
				    [[listObject->object retain] autorelease];

				[timersQueue removeListObject: listObject];

				[timer OF_setInRunLoop: nil];
			} else
				timer = nil;
#ifdef OF_HAVE_THREADS
		} @finally {
			[timersQueueLock unlock];
		}
#endif

		if ([timer isValid])
			[timer fire];

#ifdef OF_HAVE_THREADS
		[timersQueueLock lock];
		@try {
#endif
			nextTimer = [[timersQueue firstObject] fireDate];
#ifdef OF_HAVE_THREADS
		} @finally {
			[timersQueueLock unlock];
		}
#endif

		/* Watch for stream events until the next timer is due */
		if (nextTimer != nil) {
			double timeout = [nextTimer timeIntervalSinceNow];

			if (timeout > 0)
				[streamObserver observeWithTimeout: timeout];
		} else {
			/*
			 * No more timers: Just watch for streams until we get
			 * an event. If a timer is added by another thread, it
			 * cancels the observe.
			 */
			[streamObserver observe];
		}

		objc_autoreleasePoolPop(pool);
	}
}

- (void)stop
{
	running = NO;
	[streamObserver cancel];
}
@end