ObjFW  Artifact [c3c6b10589]

Artifact c3c6b1058904ed2105c5dad4f1fbf088ce81861c4f91ad7d4e9fd0bb2268a569:

  • File src/OFRunLoop.m — part of check-in [13ee56edf3] at 2014-06-21 21:43:43 on branch trunk — Move all macros from OFObject.h to macros.h

    This means that OFObject.h imports macros.h now, making it unnecessary
    to manually import macros.h in almost every file. And while at it, also
    import autorelease.h in OFObject.h, so that this doesn't need to be
    manually imported in almost every file as well. (user: js, size: 18456) [annotate] [blame] [check-ins using]


/*
 * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014
 *   Jonathan Schleifer <js@webkeks.org>
 *
 * All rights reserved.
 *
 * This file is part of ObjFW. It may be distributed under the terms of the
 * Q Public License 1.0, which can be found in the file LICENSE.QPL included in
 * the packaging of this file.
 *
 * Alternatively, it may be distributed under the terms of the GNU General
 * Public License, either version 2 or 3, which can be found in the file
 * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this
 * file.
 */

#include "config.h"

#include <assert.h>

#import "OFRunLoop.h"
#import "OFRunLoop+Private.h"
#import "OFDictionary.h"
#ifdef OF_HAVE_SOCKETS
# import "OFKernelEventObserver.h"
#endif
#import "OFThread.h"
#ifdef OF_HAVE_THREADS
# import "OFMutex.h"
# import "OFCondition.h"
#endif
#import "OFSortedList.h"
#import "OFTimer.h"
#import "OFTimer+Private.h"
#import "OFDate.h"

static OFRunLoop *mainRunLoop = nil;

#ifdef OF_HAVE_SOCKETS
@interface OFRunLoop_QueueItem: OFObject
{
@public
	id _target;
	SEL _selector;
}
@end

@interface OFRunLoop_ReadQueueItem: OFRunLoop_QueueItem
{
@public
# ifdef OF_HAVE_BLOCKS
	of_stream_async_read_block_t _block;
# endif
	void *_buffer;
	size_t _length;
}
@end

@interface OFRunLoop_ExactReadQueueItem: OFRunLoop_QueueItem
{
@public
# ifdef OF_HAVE_BLOCKS
	of_stream_async_read_block_t _block;
# endif
	void *_buffer;
	size_t _exactLength, _readLength;
}
@end

@interface OFRunLoop_ReadLineQueueItem: OFRunLoop_QueueItem
{
@public
# ifdef OF_HAVE_BLOCKS
	of_stream_async_read_line_block_t _block;
# endif
	of_string_encoding_t _encoding;
}
@end

@interface OFRunLoop_AcceptQueueItem: OFRunLoop_QueueItem
{
@public
# ifdef OF_HAVE_BLOCKS
	of_tcp_socket_async_accept_block_t _block;
# endif
}
@end

@interface OFRunLoop_UDPReceiveQueueItem: OFRunLoop_QueueItem
{
@public
# ifdef OF_HAVE_BLOCKS
	of_udp_socket_async_receive_block_t _block;
# endif
	void *_buffer;
	size_t _length;
}
@end

@implementation OFRunLoop_QueueItem
- (void)dealloc
{
	[_target release];

	[super dealloc];
}
@end

@implementation OFRunLoop_ReadQueueItem
# ifdef OF_HAVE_BLOCKS
- (void)dealloc
{
	[_block release];

	[super dealloc];
}
# endif
@end

@implementation OFRunLoop_ExactReadQueueItem
# ifdef OF_HAVE_BLOCKS
- (void)dealloc
{
	[_block release];

	[super dealloc];
}
# endif
@end

@implementation OFRunLoop_ReadLineQueueItem
# ifdef OF_HAVE_BLOCKS
- (void)dealloc
{
	[_block release];

	[super dealloc];
}
# endif
@end

@implementation OFRunLoop_AcceptQueueItem
# ifdef OF_HAVE_BLOCKS
- (void)dealloc
{
	[_block release];

	[super dealloc];
}
# endif
@end

@implementation OFRunLoop_UDPReceiveQueueItem
# ifdef OF_HAVE_BLOCKS
- (void)dealloc
{
	[_block release];

	[super dealloc];
}
# endif
@end
#endif

@implementation OFRunLoop
+ (OFRunLoop*)mainRunLoop
{
	return [[mainRunLoop retain] autorelease];
}

+ (OFRunLoop*)currentRunLoop
{
#ifdef OF_HAVE_THREADS
	return [[OFThread currentThread] runLoop];
#else
	return [self mainRunLoop];
#endif
}

+ (void)OF_setMainRunLoop: (OFRunLoop*)runLoop
{
	mainRunLoop = [runLoop retain];
}

#ifdef OF_HAVE_SOCKETS
# define ADD_READ(type, object, code)					\
	void *pool = objc_autoreleasePoolPush();			\
	OFRunLoop *runLoop = [self currentRunLoop];			\
	OFList *queue = [runLoop->_readQueues objectForKey: object];	\
	type *queueItem;						\
									\
	if (queue == nil) {						\
		queue = [OFList list];					\
		[runLoop->_readQueues setObject: queue			\
					 forKey: object];		\
	}								\
									\
	if ([queue count] == 0)						\
		[runLoop->_kernelEventObserver				\
		    addObjectForReading: object];			\
									\
	queueItem = [[[type alloc] init] autorelease];			\
	code								\
	[queue appendObject: queueItem];				\
									\
	objc_autoreleasePoolPop(pool);

+ (void)OF_addAsyncReadForStream: (OFStream*)stream
			  buffer: (void*)buffer
			  length: (size_t)length
			  target: (id)target
			selector: (SEL)selector
{
	ADD_READ(OFRunLoop_ReadQueueItem, stream, {
		queueItem->_target = [target retain];
		queueItem->_selector = selector;
		queueItem->_buffer = buffer;
		queueItem->_length = length;
	})
}

+ (void)OF_addAsyncReadForStream: (OFStream*)stream
			  buffer: (void*)buffer
		     exactLength: (size_t)exactLength
			  target: (id)target
			selector: (SEL)selector
{
	ADD_READ(OFRunLoop_ExactReadQueueItem, stream, {
		queueItem->_target = [target retain];
		queueItem->_selector = selector;
		queueItem->_buffer = buffer;
		queueItem->_exactLength = exactLength;
	})
}

+ (void)OF_addAsyncReadLineForStream: (OFStream*)stream
			    encoding: (of_string_encoding_t)encoding
			      target: (id)target
			    selector: (SEL)selector
{
	ADD_READ(OFRunLoop_ReadLineQueueItem, stream, {
		queueItem->_target = [target retain];
		queueItem->_selector = selector;
		queueItem->_encoding = encoding;
	})
}

+ (void)OF_addAsyncAcceptForTCPSocket: (OFTCPSocket*)stream
			       target: (id)target
			     selector: (SEL)selector
{
	ADD_READ(OFRunLoop_AcceptQueueItem, stream, {
		queueItem->_target = [target retain];
		queueItem->_selector = selector;
	})
}

+ (void)OF_addAsyncReceiveForUDPSocket: (OFUDPSocket*)socket
				buffer: (void*)buffer
				length: (size_t)length
				target: (id)target
			      selector: (SEL)selector
{
	ADD_READ(OFRunLoop_UDPReceiveQueueItem, socket, {
		queueItem->_buffer = buffer;
		queueItem->_length = length;
		queueItem->_target = [target retain];
		queueItem->_selector = selector;
	})
}

# ifdef OF_HAVE_BLOCKS
+ (void)OF_addAsyncReadForStream: (OFStream*)stream
			  buffer: (void*)buffer
			  length: (size_t)length
			   block: (of_stream_async_read_block_t)block
{
	ADD_READ(OFRunLoop_ReadQueueItem, stream, {
		queueItem->_block = [block copy];
		queueItem->_buffer = buffer;
		queueItem->_length = length;
	})
}

+ (void)OF_addAsyncReadForStream: (OFStream*)stream
			  buffer: (void*)buffer
		     exactLength: (size_t)exactLength
			   block: (of_stream_async_read_block_t)block
{
	ADD_READ(OFRunLoop_ExactReadQueueItem, stream, {
		queueItem->_block = [block copy];
		queueItem->_buffer = buffer;
		queueItem->_exactLength = exactLength;
	})
}

+ (void)OF_addAsyncReadLineForStream: (OFStream*)stream
			    encoding: (of_string_encoding_t)encoding
			       block: (of_stream_async_read_line_block_t)block
{
	ADD_READ(OFRunLoop_ReadLineQueueItem, stream, {
		queueItem->_block = [block copy];
		queueItem->_encoding = encoding;
	})
}

+ (void)OF_addAsyncAcceptForTCPSocket: (OFTCPSocket*)stream
				block: (of_tcp_socket_async_accept_block_t)block
{
	ADD_READ(OFRunLoop_AcceptQueueItem, stream, {
		queueItem->_block = [block copy];
	})
}

+ (void)OF_addAsyncReceiveForUDPSocket: (OFUDPSocket*)socket
				buffer: (void*)buffer
				length: (size_t)length
				 block: (of_udp_socket_async_receive_block_t)
					    block
{
	ADD_READ(OFRunLoop_UDPReceiveQueueItem, socket, {
		queueItem->_buffer = buffer;
		queueItem->_length = length;
		queueItem->_block = [block copy];
	})
}
# endif
# undef ADD_READ

+ (void)OF_cancelAsyncRequestsForObject: (id)object
{
	void *pool = objc_autoreleasePoolPush();
	OFRunLoop *runLoop = [self currentRunLoop];
	OFList *queue;

	if ((queue = [runLoop->_readQueues objectForKey: object]) != nil) {
		assert([queue count] > 0);

		[runLoop->_kernelEventObserver removeObjectForReading: object];
		[runLoop->_readQueues removeObjectForKey: object];
	}

	objc_autoreleasePoolPop(pool);
}
#endif

- init
{
	self = [super init];

	@try {
		_timersQueue = [[OFSortedList alloc] init];
#ifdef OF_HAVE_THREADS
		_timersQueueLock = [[OFMutex alloc] init];
#endif

#if defined(OF_HAVE_SOCKETS)
		_kernelEventObserver = [[OFKernelEventObserver alloc] init];
		[_kernelEventObserver setDelegate: self];

		_readQueues = [[OFMutableDictionary alloc] init];
#elif defined(OF_HAVE_THREADS)
		_condition = [[OFCondition alloc] init];
#endif
	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}

- (void)dealloc
{
	[_timersQueue release];
#ifdef OF_HAVE_THREADS
	[_timersQueueLock release];
#endif
#if defined(OF_HAVE_SOCKETS)
	[_kernelEventObserver release];
	[_readQueues release];
#elif defined(OF_HAVE_THREADS)
	[_condition release];
#endif

	[super dealloc];
}

- (void)addTimer: (OFTimer*)timer
{
#ifdef OF_HAVE_THREADS
	[_timersQueueLock lock];
	@try {
#endif
		[_timersQueue insertObject: timer];
#ifdef OF_HAVE_THREADS
	} @finally {
		[_timersQueueLock unlock];
	}
#endif

	[timer OF_setInRunLoop: self];

#if defined(OF_HAVE_SOCKETS)
	[_kernelEventObserver cancel];
#elif defined(OF_HAVE_THREADS)
	[_condition lock];
	[_condition signal];
	[_condition unlock];
#endif
}

- (void)OF_removeTimer: (OFTimer*)timer
{
#ifdef OF_HAVE_THREADS
	[_timersQueueLock lock];
	@try {
#endif
		of_list_object_t *iter;

		for (iter = [_timersQueue firstListObject]; iter != NULL;
		    iter = iter->next) {
			if ([iter->object isEqual: timer]) {
				[_timersQueue removeListObject: iter];
				break;
			}
		}
#ifdef OF_HAVE_THREADS
	} @finally {
		[_timersQueueLock unlock];
	}
#endif
}

#ifdef OF_HAVE_SOCKETS
- (void)objectIsReadyForReading: (id)object
{
	OFList *queue = [_readQueues objectForKey: object];
	of_list_object_t *listObject;

	assert(queue != nil);

	listObject = [queue firstListObject];

	if ([listObject->object isKindOfClass:
	    [OFRunLoop_ReadQueueItem class]]) {
		OFRunLoop_ReadQueueItem *queueItem = listObject->object;
		size_t length;
		OFException *exception = nil;

		@try {
			length = [object readIntoBuffer: queueItem->_buffer
						 length: queueItem->_length];
		} @catch (OFException *e) {
			length = 0;
			exception = e;
		}

# ifdef OF_HAVE_BLOCKS
		if (queueItem->_block != NULL) {
			if (!queueItem->_block(object, queueItem->_buffer,
			    length, exception)) {
				[queue removeListObject: listObject];

				if ([queue count] == 0) {
					[_kernelEventObserver
					    removeObjectForReading: object];
					[_readQueues
					    removeObjectForKey: object];
				}
			}
		} else {
# endif
			bool (*func)(id, SEL, OFStream*, void*, size_t,
			    OFException*) = (bool(*)(id, SEL, OFStream*, void*,
			    size_t, OFException*))
			    [queueItem->_target methodForSelector:
			    queueItem->_selector];

			if (!func(queueItem->_target, queueItem->_selector,
			    object, queueItem->_buffer, length, exception)) {
				[queue removeListObject: listObject];

				if ([queue count] == 0) {
					[_kernelEventObserver
					    removeObjectForReading: object];
					[_readQueues
					    removeObjectForKey: object];
				}
			}
# ifdef OF_HAVE_BLOCKS
		}
# endif
	} else if ([listObject->object isKindOfClass:
	    [OFRunLoop_ExactReadQueueItem class]]) {
		OFRunLoop_ExactReadQueueItem *queueItem = listObject->object;
		size_t length;
		OFException *exception = nil;

		@try {
			length = [object
			    readIntoBuffer: (char*)queueItem->_buffer +
					    queueItem->_readLength
				    length: queueItem->_exactLength -
					    queueItem->_readLength];
		} @catch (OFException *e) {
			length = 0;
			exception = e;
		}

		queueItem->_readLength += length;
		if (queueItem->_readLength == queueItem->_exactLength ||
		    [object isAtEndOfStream] || exception != nil) {
# ifdef OF_HAVE_BLOCKS
			if (queueItem->_block != NULL) {
				if (queueItem->_block(object,
				    queueItem->_buffer, queueItem->_readLength,
				    exception))
					queueItem->_readLength = 0;
				else {
					[queue removeListObject: listObject];

					if ([queue count] == 0) {
						[_kernelEventObserver
						    removeObjectForReading:
						    object];
						[_readQueues
						    removeObjectForKey: object];
					}
				}
			} else {
# endif
				bool (*func)(id, SEL, OFStream*, void*,
				    size_t, OFException*) = (bool(*)(id, SEL,
				    OFStream*, void*, size_t, OFException*))
				    [queueItem->_target
				    methodForSelector: queueItem->_selector];

				if (func(queueItem->_target,
				    queueItem->_selector, object,
				    queueItem->_buffer, queueItem->_readLength,
				    exception))
					queueItem->_readLength = 0;
				else {
					[queue removeListObject: listObject];

					if ([queue count] == 0) {
						[_kernelEventObserver
						    removeObjectForReading:
						    object];
						[_readQueues
						    removeObjectForKey: object];
					}
				}
# ifdef OF_HAVE_BLOCKS
			}
# endif
		}
	} else if ([listObject->object isKindOfClass:
	    [OFRunLoop_ReadLineQueueItem class]]) {
		OFRunLoop_ReadLineQueueItem *queueItem = listObject->object;
		OFString *line;
		OFException *exception = nil;

		@try {
			line = [object
			    tryReadLineWithEncoding: queueItem->_encoding];
		} @catch (OFException *e) {
			line = nil;
			exception = e;
		}

		if (line != nil || [object isAtEndOfStream] ||
		    exception != nil) {
# ifdef OF_HAVE_BLOCKS
			if (queueItem->_block != NULL) {
				if (!queueItem->_block(object, line,
				    exception)) {
					[queue removeListObject: listObject];

					if ([queue count] == 0) {
						[_kernelEventObserver
						    removeObjectForReading:
						    object];
						[_readQueues
						    removeObjectForKey: object];
					}
				}
			} else {
# endif
				bool (*func)(id, SEL, OFStream*, OFString*,
				    OFException*) = (bool(*)(id, SEL, OFStream*,
				    OFString*, OFException*))
				    [queueItem->_target methodForSelector:
				    queueItem->_selector];

				if (!func(queueItem->_target,
				    queueItem->_selector, object, line,
				    exception)) {
					[queue removeListObject: listObject];

					if ([queue count] == 0) {
						[_kernelEventObserver
						    removeObjectForReading:
						    object];
						[_readQueues
						    removeObjectForKey: object];
					}
				}
# ifdef OF_HAVE_BLOCKS
			}
# endif
		}
	} else if ([listObject->object isKindOfClass:
	    [OFRunLoop_AcceptQueueItem class]]) {
		OFRunLoop_AcceptQueueItem *queueItem = listObject->object;
		OFTCPSocket *newSocket;
		OFException *exception = nil;

		@try {
			newSocket = [object accept];
		} @catch (OFException *e) {
			newSocket = nil;
			exception = e;
		}

# ifdef OF_HAVE_BLOCKS
		if (queueItem->_block != NULL) {
			if (!queueItem->_block(object, newSocket, exception)) {
				[queue removeListObject: listObject];

				if ([queue count] == 0) {
					[_kernelEventObserver
					    removeObjectForReading: object];
					[_readQueues
					    removeObjectForKey: object];
				}
			}
		} else {
# endif
			bool (*func)(id, SEL, OFTCPSocket*, OFTCPSocket*,
			    OFException*) =
			    (bool(*)(id, SEL, OFTCPSocket*, OFTCPSocket*,
			    OFException*))
			    [queueItem->_target methodForSelector:
			    queueItem->_selector];

			if (!func(queueItem->_target, queueItem->_selector,
			    object, newSocket, exception)) {
				[queue removeListObject: listObject];

				if ([queue count] == 0) {
					[_kernelEventObserver
					    removeObjectForReading: object];
					[_readQueues
					    removeObjectForKey: object];
				}
			}
# ifdef OF_HAVE_BLOCKS
		}
# endif
	} else if ([listObject->object isKindOfClass:
	    [OFRunLoop_UDPReceiveQueueItem class]]) {
		OFRunLoop_UDPReceiveQueueItem *queueItem = listObject->object;
		size_t length;
		of_udp_socket_address_t address;
		OFException *exception = nil;

		@try {
			length = [object receiveIntoBuffer: queueItem->_buffer
						    length: queueItem->_length
						    sender: &address];
		} @catch (OFException *e) {
			length = 0;
			exception = e;
		}

# ifdef OF_HAVE_BLOCKS
		if (queueItem->_block != NULL) {
			if (!queueItem->_block(object, queueItem->_buffer,
			    length, address, exception)) {
				[queue removeListObject: listObject];

				if ([queue count] == 0) {
					[_kernelEventObserver
					    removeObjectForReading: object];
					[_readQueues
					    removeObjectForKey: object];
				}
			}
		} else {
# endif
			bool (*func)(id, SEL, OFUDPSocket*, void*, size_t,
			    of_udp_socket_address_t address, OFException*) =
			    (bool(*)(id, SEL, OFUDPSocket*, void*, size_t,
			    of_udp_socket_address_t, OFException*))
			    [queueItem->_target methodForSelector:
			    queueItem->_selector];

			if (!func(queueItem->_target, queueItem->_selector,
			    object, queueItem->_buffer, length, address,
			    exception)) {
				[queue removeListObject: listObject];

				if ([queue count] == 0) {
					[_kernelEventObserver
					    removeObjectForReading: object];
					[_readQueues
					    removeObjectForKey: object];
				}
			}
# ifdef OF_HAVE_BLOCKS
		}
# endif
	} else
		assert(0);
}
#endif

- (void)run
{
	_running = true;

	for (;;) {
		void *pool;
		OFDate *now;
		OFTimer *timer;
		OFDate *nextTimer;

		if (!_running)
			break;

		pool = objc_autoreleasePoolPush();
		now = [OFDate date];

#ifdef OF_HAVE_THREADS
		[_timersQueueLock lock];
		@try {
#endif
			of_list_object_t *listObject =
			    [_timersQueue firstListObject];

			if (listObject != NULL &&
			    [[listObject->object fireDate] compare: now] !=
			    OF_ORDERED_DESCENDING) {
				timer =
				    [[listObject->object retain] autorelease];

				[_timersQueue removeListObject: listObject];

				[timer OF_setInRunLoop: nil];
			} else
				timer = nil;
#ifdef OF_HAVE_THREADS
		} @finally {
			[_timersQueueLock unlock];
		}
#endif

		if ([timer isValid])
			[timer fire];

#ifdef OF_HAVE_THREADS
		[_timersQueueLock lock];
		@try {
#endif
			nextTimer = [[_timersQueue firstObject] fireDate];
#ifdef OF_HAVE_THREADS
		} @finally {
			[_timersQueueLock unlock];
		}
#endif

		/* Watch for I/O events until the next timer is due */
		if (nextTimer != nil) {
			of_time_interval_t timeout =
			    [nextTimer timeIntervalSinceNow];

			if (timeout > 0) {
#if defined(OF_HAVE_SOCKETS)
				[_kernelEventObserver
				    observeForTimeInterval: timeout];
#elif defined(OF_HAVE_THREADS)
				[_condition lock];
				[_condition waitForTimeInterval: timeout];
				[_condition unlock];
#else
				[OFThread sleepForTimeInterval: timeout];
#endif
			}
		} else {
			/*
			 * No more timers: Just watch for I/O until we get
			 * an event. If a timer is added by another thread, it
			 * cancels the observe.
			 */
#if defined(OF_HAVE_SOCKETS)
			[_kernelEventObserver observe];
#elif defined(OF_HAVE_THREADS)
			[_condition lock];
			[_condition wait];
			[_condition unlock];
#else
			[OFThread sleepForTimeInterval: 86400];
#endif
		}

		objc_autoreleasePoolPop(pool);
	}
}

- (void)stop
{
	_running = false;
#if defined(OF_HAVE_SOCKETS)
	[_kernelEventObserver cancel];
#elif defined(OF_HAVE_THREADS)
	[_condition lock];
	[_condition signal];
	[_condition unlock];
#endif
}
@end