ObjFW  Artifact [5f45cad885]

Artifact 5f45cad8857d287b506df457277da5bc580827919393f012031e4b9e99340056:

  • File src/OFRunLoop.m — part of check-in [d1d36ae522] at 2021-11-06 15:57:29 on branch trunk — OFStream: New write API

    The old write API made it too easy to lose bytes when a stream is set to
    non-blocking mode. The new API always throws when not all bytes were
    written, which forces handling the number of bytes being written being
    smaller than the number of bytes requested to be written. (user: js, size: 36952) [annotate] [blame] [check-ins using]


/*
 * Copyright (c) 2008-2021 Jonathan Schleifer <js@nil.im>
 *
 * All rights reserved.
 *
 * This file is part of ObjFW. It may be distributed under the terms of the
 * Q Public License 1.0, which can be found in the file LICENSE.QPL included in
 * the packaging of this file.
 *
 * Alternatively, it may be distributed under the terms of the GNU General
 * Public License, either version 2 or 3, which can be found in the file
 * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this
 * file.
 */

#include "config.h"

#include <assert.h>
#include <errno.h>

#import "OFRunLoop.h"
#import "OFRunLoop+Private.h"
#import "OFArray.h"
#import "OFData.h"
#import "OFDictionary.h"
#ifdef OF_HAVE_SOCKETS
# import "OFKernelEventObserver.h"
# import "OFDatagramSocket.h"
# import "OFSequencedPacketSocket.h"
# import "OFSequencedPacketSocket+Private.h"
# import "OFStreamSocket.h"
# import "OFStreamSocket+Private.h"
#endif
#import "OFThread.h"
#ifdef OF_HAVE_THREADS
# import "OFMutex.h"
# import "OFCondition.h"
#endif
#import "OFSortedList.h"
#import "OFTimer.h"
#import "OFTimer+Private.h"
#import "OFDate.h"

#import "OFObserveFailedException.h"
#import "OFWriteFailedException.h"

#include "OFRunLoopConstants.inc"

static OFRunLoop *mainRunLoop = nil;

@interface OFRunLoopState: OFObject
#ifdef OF_HAVE_SOCKETS
    <OFKernelEventObserverDelegate>
#endif
{
@public
	OFSortedList OF_GENERIC(OFTimer *) *_timersQueue;
#ifdef OF_HAVE_THREADS
	OFMutex *_timersQueueMutex;
#endif
#if defined(OF_HAVE_SOCKETS)
	OFKernelEventObserver *_kernelEventObserver;
	OFMutableDictionary *_readQueues, *_writeQueues;
#elif defined(OF_HAVE_THREADS)
	OFCondition *_condition;
# ifdef OF_AMIGAOS
	ULONG _execSignalMask;
# endif
#endif
#ifdef OF_AMIGAOS
	OFMutableData *_execSignals;
	OFMutableArray *_execSignalsTargets;
	OFMutableData *_execSignalsSelectors;
# ifdef OF_HAVE_THREADS
	OFMutex *_execSignalsMutex;
# endif
#endif
}
@end

#ifdef OF_HAVE_SOCKETS
@interface OFRunLoopQueueItem: OFObject
{
@public
	id _delegate;
}

- (bool)handleObject: (id)object;
@end

@interface OFRunLoopReadQueueItem: OFRunLoopQueueItem
{
@public
# ifdef OF_HAVE_BLOCKS
	OFStreamAsyncReadBlock _block;
# endif
	void *_buffer;
	size_t _length;
}
@end

@interface OFRunLoopExactReadQueueItem: OFRunLoopQueueItem
{
@public
# ifdef OF_HAVE_BLOCKS
	OFStreamAsyncReadBlock _block;
# endif
	void *_buffer;
	size_t _exactLength, _readLength;
}
@end

@interface OFRunLoopReadLineQueueItem: OFRunLoopQueueItem
{
@public
# ifdef OF_HAVE_BLOCKS
	OFStreamAsyncReadLineBlock _block;
# endif
	OFStringEncoding _encoding;
}
@end

@interface OFRunLoopWriteDataQueueItem: OFRunLoopQueueItem
{
@public
# ifdef OF_HAVE_BLOCKS
	OFStreamAsyncWriteDataBlock _block;
# endif
	OFData *_data;
	size_t _writtenLength;
}
@end

@interface OFRunLoopWriteStringQueueItem: OFRunLoopQueueItem
{
@public
# ifdef OF_HAVE_BLOCKS
	OFStreamAsyncWriteStringBlock _block;
# endif
	OFString *_string;
	OFStringEncoding _encoding;
	size_t _writtenLength;
}
@end

# if !defined(OF_WII) && !defined(OF_NINTENDO_3DS)
@interface OFRunLoopConnectQueueItem: OFRunLoopQueueItem
@end
# endif

@interface OFRunLoopAcceptQueueItem: OFRunLoopQueueItem
{
@public
# ifdef OF_HAVE_BLOCKS
	id _block;
# endif
}
@end

@interface OFRunLoopDatagramReceiveQueueItem: OFRunLoopQueueItem
{
@public
# ifdef OF_HAVE_BLOCKS
	OFDatagramSocketAsyncReceiveBlock _block;
# endif
	void *_buffer;
	size_t _length;
}
@end

@interface OFRunLoopDatagramSendQueueItem: OFRunLoopQueueItem
{
@public
# ifdef OF_HAVE_BLOCKS
	OFDatagramSocketAsyncSendDataBlock _block;
# endif
	OFData *_data;
	OFSocketAddress _receiver;
}
@end

@interface OFRunLoopPacketReceiveQueueItem: OFRunLoopQueueItem
{
@public
# ifdef OF_HAVE_BLOCKS
	OFSequencedPacketSocketAsyncReceiveBlock _block;
# endif
	void *_buffer;
	size_t _length;
}
@end

@interface OFRunLoopPacketSendQueueItem: OFRunLoopQueueItem
{
@public
# ifdef OF_HAVE_BLOCKS
	OFSequencedPacketSocketAsyncSendDataBlock _block;
# endif
	OFData *_data;
}
@end
#endif

@implementation OFRunLoopState
- (instancetype)init
{
	self = [super init];

	@try {
		_timersQueue = [[OFSortedList alloc] init];
#ifdef OF_HAVE_THREADS
		_timersQueueMutex = [[OFMutex alloc] init];
#endif

#if defined(OF_HAVE_SOCKETS)
		_kernelEventObserver = [[OFKernelEventObserver alloc] init];
		_kernelEventObserver.delegate = self;

		_readQueues = [[OFMutableDictionary alloc] init];
		_writeQueues = [[OFMutableDictionary alloc] init];
#elif defined(OF_HAVE_THREADS)
		_condition = [[OFCondition alloc] init];
#endif
#ifdef OF_AMIGAOS
		_execSignals = [[OFMutableData alloc]
		    initWithItemSize: sizeof(ULONG)];
		_execSignalsTargets = [[OFMutableArray alloc] init];
		_execSignalsSelectors = [[OFMutableData alloc]
		    initWithItemSize: sizeof(SEL)];
# ifdef OF_HAVE_THREADS
		_execSignalsMutex = [[OFMutex alloc] init];
# endif
#endif
	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}

- (void)dealloc
{
	[_timersQueue release];
#ifdef OF_HAVE_THREADS
	[_timersQueueMutex release];
#endif
#if defined(OF_HAVE_SOCKETS)
	[_kernelEventObserver release];
	[_readQueues release];
	[_writeQueues release];
#elif defined(OF_HAVE_THREADS)
	[_condition release];
#endif
#ifdef OF_AMIGAOS
	[_execSignals release];
	[_execSignalsTargets release];
	[_execSignalsSelectors release];
# ifdef OF_HAVE_THREADS
	[_execSignalsMutex release];
# endif
#endif

	[super dealloc];
}

#ifdef OF_HAVE_SOCKETS
- (void)objectIsReadyForReading: (id)object
{
	/*
	 * Retain the queue so that it doesn't disappear from us because the
	 * handler called -[cancelAsyncRequests].
	 */
	OFList OF_GENERIC(OF_KINDOF(OFRunLoopReadQueueItem *)) *queue =
	    [[_readQueues objectForKey: object] retain];

	assert(queue != nil);

	@try {
		if (![queue.firstObject handleObject: object]) {
			OFListItem listItem = queue.firstListItem;

			/*
			 * The handler might have called -[cancelAsyncRequests]
			 * so that our queue is now empty, in which case we
			 * should do nothing.
			 */
			if (listItem != NULL) {
				/*
				 * Make sure we keep the target until after we
				 * are done removing the object. The reason for
				 * this is that the target might call
				 * -[cancelAsyncRequests] in its dealloc.
				 */
				[[OFListItemObject(listItem) retain]
				    autorelease];

				[queue removeListItem: listItem];

				if (queue.count == 0) {
					[_kernelEventObserver
					    removeObjectForReading: object];
					[_readQueues
					    removeObjectForKey: object];
				}
			}
		}
	} @finally {
		[queue release];
	}
}

- (void)objectIsReadyForWriting: (id)object
{
	/*
	 * Retain the queue so that it doesn't disappear from us because the
	 * handler called -[cancelAsyncRequests].
	 */
	OFList *queue = [[_writeQueues objectForKey: object] retain];

	assert(queue != nil);

	@try {
		if (![queue.firstObject handleObject: object]) {
			OFListItem listItem = queue.firstListItem;

			/*
			 * The handler might have called -[cancelAsyncRequests]
			 * so that our queue is now empty, in which case we
			 * should do nothing.
			 */
			if (listItem != NULL) {
				/*
				 * Make sure we keep the target until after we
				 * are done removing the object. The reason for
				 * this is that the target might call
				 * -[cancelAsyncRequests] in its dealloc.
				 */
				[[OFListItemObject(listItem) retain]
				    autorelease];

				[queue removeListItem: listItem];

				if (queue.count == 0) {
					[_kernelEventObserver
					    removeObjectForWriting: object];
					[_writeQueues
					    removeObjectForKey: object];
				}
			}
		}
	} @finally {
		[queue release];
	}
}
#endif

#ifdef OF_AMIGAOS
- (void)execSignalWasReceived: (ULONG)signalMask
{
	void *pool = objc_autoreleasePoolPush();
	OFData *signals;
	OFArray *targets;
	OFData *selectors;
	const ULONG *signalsItems;
	const id *targetsObjects;
	const SEL *selectorsItems;
	size_t count;

# ifdef OF_HAVE_THREADS
	[_execSignalsMutex lock];
	@try {
# endif
		/*
		 * Create copies, so that signal handlers are allowed to modify
		 * signals.
		 */
		signals = [[_execSignals copy] autorelease];
		targets = [[_execSignalsTargets copy] autorelease];
		selectors = [[_execSignalsSelectors copy] autorelease];
# ifdef OF_HAVE_THREADS
	} @finally {
		[_execSignalsMutex unlock];
	}
# endif

	signalsItems = signals.items;
	targetsObjects = targets.objects;
	selectorsItems = selectors.items;
	count = signals.count;

	for (size_t i = 0; i < count; i++) {
		if (signalMask & (1ul << signalsItems[i])) {
			void (*callback)(id, SEL, ULONG) =
			    (void (*)(id, SEL, ULONG))[targetsObjects[i]
			    methodForSelector: selectorsItems[i]];

			callback(targetsObjects[i], selectorsItems[i],
			    signalsItems[i]);
		}
	}

	objc_autoreleasePoolPop(pool);
}
#endif
@end

#ifdef OF_HAVE_SOCKETS
@implementation OFRunLoopQueueItem
- (bool)handleObject: (id)object
{
	OF_UNRECOGNIZED_SELECTOR
}

- (void)dealloc
{
	[_delegate release];

	[super dealloc];
}
@end

@implementation OFRunLoopReadQueueItem
- (bool)handleObject: (id)object
{
	size_t length;
	id exception = nil;

	@try {
		length = [object readIntoBuffer: _buffer length: _length];
	} @catch (id e) {
		length = 0;
		exception = e;
	}

# ifdef OF_HAVE_BLOCKS
	if (_block != NULL)
		return _block(length, exception);
	else {
# endif
		if (![_delegate respondsToSelector:
		    @selector(stream:didReadIntoBuffer:length:exception:)])
			return false;

		return [_delegate stream: object
		       didReadIntoBuffer: _buffer
				  length: length
			       exception: exception];
# ifdef OF_HAVE_BLOCKS
	}
# endif
}

# ifdef OF_HAVE_BLOCKS
- (void)dealloc
{
	[_block release];

	[super dealloc];
}
# endif
@end

@implementation OFRunLoopExactReadQueueItem
- (bool)handleObject: (id)object
{
	size_t length;
	id exception = nil;

	@try {
		length = [object readIntoBuffer: (char *)_buffer + _readLength
					 length: _exactLength - _readLength];
	} @catch (id e) {
		length = 0;
		exception = e;
	}

	_readLength += length;

	if (_readLength != _exactLength && ![object isAtEndOfStream] &&
	    exception == nil)
		return true;

# ifdef OF_HAVE_BLOCKS
	if (_block != NULL) {
		if (!_block(_readLength, exception))
			return false;

		_readLength = 0;
		return true;
	} else {
# endif
		if (![_delegate respondsToSelector:
		    @selector(stream:didReadIntoBuffer:length:exception:)])
			return false;

		if (![_delegate stream: object
		     didReadIntoBuffer: _buffer
				length: _readLength
			     exception: exception])
			return false;

		_readLength = 0;
		return true;
# ifdef OF_HAVE_BLOCKS
	}
# endif
}

# ifdef OF_HAVE_BLOCKS
- (void)dealloc
{
	[_block release];

	[super dealloc];
}
# endif
@end

@implementation OFRunLoopReadLineQueueItem
- (bool)handleObject: (id)object
{
	OFString *line;
	id exception = nil;

	@try {
		line = [object tryReadLineWithEncoding: _encoding];
	} @catch (id e) {
		line = nil;
		exception = e;
	}

	if (line == nil && ![object isAtEndOfStream] && exception == nil)
		return true;

# ifdef OF_HAVE_BLOCKS
	if (_block != NULL)
		return _block(line, exception);
	else {
# endif
		if (![_delegate respondsToSelector:
		    @selector(stream:didReadLine:exception:)])
			return false;

		return [_delegate stream: object
			     didReadLine: line
			       exception: exception];
# ifdef OF_HAVE_BLOCKS
	}
# endif
}

# ifdef OF_HAVE_BLOCKS
- (void)dealloc
{
	[_block release];

	[super dealloc];
}
# endif
@end

@implementation OFRunLoopWriteDataQueueItem
- (bool)handleObject: (id)object
{
	size_t length;
	id exception = nil;
	size_t dataLength = _data.count * _data.itemSize;
	OFData *newData, *oldData;

	@try {
		const char *dataItems = _data.items;
		length = dataLength - _writtenLength;
		[object writeBuffer: dataItems + _writtenLength length: length];
	} @catch (OFWriteFailedException *e) {
		length = e.bytesWritten;

		if (e.errNo != EWOULDBLOCK)
			exception = e;
	} @catch (id e) {
		length = 0;
		exception = e;
	}

	_writtenLength += length;
	OFEnsure(_writtenLength <= dataLength);

	if (_writtenLength != dataLength && exception == nil)
		return true;

# ifdef OF_HAVE_BLOCKS
	if (_block != NULL) {
		newData = _block(_data, _writtenLength, exception);

		if (newData == nil)
			return false;

		oldData = _data;
		_data = [newData copy];
		[oldData release];

		_writtenLength = 0;
		return true;
	} else {
# endif
		if (![_delegate respondsToSelector:
		    @selector(stream:didWriteData:bytesWritten:exception:)])
			return false;

		newData = [_delegate stream: object
			       didWriteData: _data
			       bytesWritten: _writtenLength
				  exception: exception];

		if (newData == nil)
			return false;

		oldData = _data;
		_data = [newData copy];
		[oldData release];

		_writtenLength = 0;
		return true;
# ifdef OF_HAVE_BLOCKS
	}
# endif
}

- (void)dealloc
{
	[_data release];
# ifdef OF_HAVE_BLOCKS
	[_block release];
# endif

	[super dealloc];
}
@end

@implementation OFRunLoopWriteStringQueueItem
- (bool)handleObject: (id)object
{
	size_t length;
	id exception = nil;
	size_t cStringLength = [_string cStringLengthWithEncoding: _encoding];
	OFString *newString, *oldString;

	@try {
		const char *cString = [_string cStringWithEncoding: _encoding];
		length = cStringLength - _writtenLength;
		[object writeBuffer: cString + _writtenLength length: length];
	} @catch (OFWriteFailedException *e) {
		length = e.bytesWritten;

		if (e.errNo != EWOULDBLOCK)
			exception = e;
	} @catch (id e) {
		length = 0;
		exception = e;
	}

	_writtenLength += length;
	OFEnsure(_writtenLength <= cStringLength);

	if (_writtenLength != cStringLength && exception == nil)
		return true;

# ifdef OF_HAVE_BLOCKS
	if (_block != NULL) {
		newString = _block(_string, _writtenLength, exception);

		if (newString == nil)
			return false;

		oldString = _string;
		_string = [newString copy];
		[oldString release];

		_writtenLength = 0;
		return true;
	} else {
# endif
		if (![_delegate respondsToSelector: @selector(stream:
		    didWriteString:encoding:bytesWritten:exception:)])
			return false;

		newString = [_delegate stream: object
			       didWriteString: _string
				     encoding: _encoding
				 bytesWritten: _writtenLength
				    exception: exception];

		if (newString == nil)
			return false;

		oldString = _string;
		_string = [newString copy];
		[oldString release];

		_writtenLength = 0;
		return true;
# ifdef OF_HAVE_BLOCKS
	}
# endif
}

- (void)dealloc
{
	[_string release];
# ifdef OF_HAVE_BLOCKS
	[_block release];
# endif

	[super dealloc];
}
@end

# if !defined(OF_WII) && !defined(OF_NINTENDO_3DS)
@implementation OFRunLoopConnectQueueItem
- (bool)handleObject: (id)object
{
	id exception = nil;
	int errNo;

	if ((errNo = [object of_socketError]) != 0)
		exception =
		    [_delegate of_connectionFailedExceptionForErrNo: errNo];

	if ([_delegate respondsToSelector:
	    @selector(of_socketDidConnect:exception:)]) {
		/*
		 * Make sure we only call the delegate once we removed the
		 * socket from the kernel event observer. This is necessary as
		 * otherwise we could try to connect to the next address and it
		 * would not be re-registered with the kernel event observer,
		 * which is necessary for some kernel event observers (e.g.
		 * epoll) even if the fd of the new socket is the same.
		 */
		OFRunLoop *runLoop = [OFRunLoop currentRunLoop];
		OFTimer *timer = [OFTimer
		    timerWithTimeInterval: 0
				   target: _delegate
				 selector: @selector(of_socketDidConnect:
					       exception:)
				   object: object
				   object: exception
				  repeats: false];
		[runLoop addTimer: timer forMode: runLoop.currentMode];
	}

	return false;
}
@end
# endif

@implementation OFRunLoopAcceptQueueItem
- (bool)handleObject: (id)object
{
	id acceptedSocket, exception = nil;

	@try {
		acceptedSocket = [object accept];
	} @catch (id e) {
		acceptedSocket = nil;
		exception = e;
	}

# ifdef OF_HAVE_BLOCKS
	if (_block != NULL) {
		if ([object isKindOfClass: [OFStreamSocket class]])
			return ((OFStreamSocketAsyncAcceptBlock)_block)(
			    acceptedSocket, exception);
		else if ([object isKindOfClass:
		    [OFSequencedPacketSocket class]])
			return ((OFSequencedPacketSocketAsyncAcceptBlock)
			    _block)(acceptedSocket, exception);
		else
			OFEnsure(0);
	} else {
# endif
		if (![_delegate respondsToSelector:
		    @selector(socket:didAcceptSocket:exception:)])
			return false;

		return [_delegate socket: object
			 didAcceptSocket: acceptedSocket
			       exception: exception];
# ifdef OF_HAVE_BLOCKS
	}
# endif
}

# ifdef OF_HAVE_BLOCKS
- (void)dealloc
{
	[_block release];

	[super dealloc];
}
# endif
@end

@implementation OFRunLoopDatagramReceiveQueueItem
- (bool)handleObject: (id)object
{
	size_t length;
	OFSocketAddress address;
	id exception = nil;

	@try {
		length = [object receiveIntoBuffer: _buffer
					    length: _length
					    sender: &address];
	} @catch (id e) {
		length = 0;
		exception = e;
	}

# ifdef OF_HAVE_BLOCKS
	if (_block != NULL)
		return _block(length, &address, exception);
	else {
# endif
		if (![_delegate respondsToSelector: @selector(
		    socket:didReceiveIntoBuffer:length:sender:exception:)])
			return false;

		return [_delegate socket: object
		    didReceiveIntoBuffer: _buffer
				  length: length
				  sender: &address
			       exception: exception];
# ifdef OF_HAVE_BLOCKS
	}
# endif
}

# ifdef OF_HAVE_BLOCKS
- (void)dealloc
{
	[_block release];

	[super dealloc];
}
# endif
@end

@implementation OFRunLoopDatagramSendQueueItem
- (bool)handleObject: (id)object
{
	id exception = nil;
	OFData *newData, *oldData;

	@try {
		[object sendBuffer: _data.items
			    length: _data.count * _data.itemSize
			  receiver: &_receiver];
	} @catch (id e) {
		exception = e;
	}

# ifdef OF_HAVE_BLOCKS
	if (_block != NULL) {
		newData = _block(_data, &_receiver, exception);

		if (newData == nil)
			return false;

		oldData = _data;
		_data = [newData copy];
		[oldData release];

		return true;
	} else {
# endif
		if (![_delegate respondsToSelector:
		    @selector(socket:didSendData:receiver:exception:)])
			return false;

		newData = [_delegate socket: object
				didSendData: _data
				   receiver: &_receiver
				  exception: exception];

		if (newData == nil)
			return false;

		oldData = _data;
		_data = [newData copy];
		[oldData release];

		return true;
# ifdef OF_HAVE_BLOCKS
	}
# endif
}

- (void)dealloc
{
	[_data release];
# ifdef OF_HAVE_BLOCKS
	[_block release];
# endif

	[super dealloc];
}
@end

@implementation OFRunLoopPacketReceiveQueueItem
- (bool)handleObject: (id)object
{
	size_t length;
	id exception = nil;

	@try {
		length = [object receiveIntoBuffer: _buffer length: _length];
	} @catch (id e) {
		length = 0;
		exception = e;
	}

# ifdef OF_HAVE_BLOCKS
	if (_block != NULL)
		return _block(length, exception);
	else {
# endif
		if (![_delegate respondsToSelector: @selector(
		    socket:didReceiveIntoBuffer:length:exception:)])
			return false;

		return [_delegate socket: object
		    didReceiveIntoBuffer: _buffer
				  length: length
			       exception: exception];
# ifdef OF_HAVE_BLOCKS
	}
# endif
}

# ifdef OF_HAVE_BLOCKS
- (void)dealloc
{
	[_block release];

	[super dealloc];
}
# endif
@end

@implementation OFRunLoopPacketSendQueueItem
- (bool)handleObject: (id)object
{
	id exception = nil;
	OFData *newData, *oldData;

	@try {
		[object sendBuffer: _data.items
			    length: _data.count * _data.itemSize];
	} @catch (id e) {
		exception = e;
	}

# ifdef OF_HAVE_BLOCKS
	if (_block != NULL) {
		newData = _block(_data, exception);

		if (newData == nil)
			return false;

		oldData = _data;
		_data = [newData copy];
		[oldData release];

		return true;
	} else {
# endif
		if (![_delegate respondsToSelector:
		    @selector(socket:didSendData:exception:)])
			return false;

		newData = [_delegate socket: object
				didSendData: _data
				  exception: exception];

		if (newData == nil)
			return false;

		oldData = _data;
		_data = [newData copy];
		[oldData release];

		return true;
# ifdef OF_HAVE_BLOCKS
	}
# endif
}

- (void)dealloc
{
	[_data release];
# ifdef OF_HAVE_BLOCKS
	[_block release];
# endif

	[super dealloc];
}
@end
#endif

@implementation OFRunLoop
@synthesize currentMode = _currentMode;

+ (OFRunLoop *)mainRunLoop
{
	return mainRunLoop;
}

+ (OFRunLoop *)currentRunLoop
{
#ifdef OF_HAVE_THREADS
	return [OFThread currentThread].runLoop;
#else
	return [self mainRunLoop];
#endif
}

+ (void)of_setMainRunLoop: (OFRunLoop *)runLoop
{
	mainRunLoop = [runLoop retain];
}

static OFRunLoopState *
stateForMode(OFRunLoop *self, OFRunLoopMode mode, bool create)
{
	OFRunLoopState *state;

#ifdef OF_HAVE_THREADS
	[self->_statesMutex lock];
	@try {
#endif
		state = [self->_states objectForKey: mode];

		if (create && state == nil) {
			state = [[OFRunLoopState alloc] init];
			@try {
				[self->_states setObject: state forKey: mode];
			} @finally {
				[state release];
			}
		}
#ifdef OF_HAVE_THREADS
	} @finally {
		[self->_statesMutex unlock];
	}
#endif

	return state;
}

#ifdef OF_HAVE_SOCKETS
# define NEW_READ(type, object, mode)					\
	void *pool = objc_autoreleasePoolPush();			\
	OFRunLoop *runLoop = [self currentRunLoop];			\
	OFRunLoopState *state = stateForMode(runLoop, mode, true);	\
	OFList *queue = [state->_readQueues objectForKey: object];	\
	type *queueItem;						\
									\
	if (queue == nil) {						\
		queue = [OFList list];					\
		[state->_readQueues setObject: queue forKey: object];	\
	}								\
									\
	if (queue.count == 0)						\
		[state->_kernelEventObserver				\
		    addObjectForReading: object];			\
									\
	queueItem = [[[type alloc] init] autorelease];
# define NEW_WRITE(type, object, mode)					\
	void *pool = objc_autoreleasePoolPush();			\
	OFRunLoop *runLoop = [self currentRunLoop];			\
	OFRunLoopState *state = stateForMode(runLoop, mode, true);	\
	OFList *queue = [state->_writeQueues objectForKey: object];	\
	type *queueItem;						\
									\
	if (queue == nil) {						\
		queue = [OFList list];					\
		[state->_writeQueues setObject: queue forKey: object];	\
	}								\
									\
	if (queue.count == 0)						\
		[state->_kernelEventObserver				\
		    addObjectForWriting: object];			\
									\
	queueItem = [[[type alloc] init] autorelease];
#define QUEUE_ITEM							\
	[queue appendObject: queueItem];				\
									\
	objc_autoreleasePoolPop(pool);

+ (void)of_addAsyncReadForStream: (OFStream <OFReadyForReadingObserving> *)
				      stream
			  buffer: (void *)buffer
			  length: (size_t)length
			    mode: (OFRunLoopMode)mode
# ifdef OF_HAVE_BLOCKS
			   block: (OFStreamAsyncReadBlock)block
# endif
			delegate: (id <OFStreamDelegate>)delegate
{
	NEW_READ(OFRunLoopReadQueueItem, stream, mode)

	queueItem->_delegate = [delegate retain];
# ifdef OF_HAVE_BLOCKS
	queueItem->_block = [block copy];
# endif
	queueItem->_buffer = buffer;
	queueItem->_length = length;

	QUEUE_ITEM
}

+ (void)of_addAsyncReadForStream: (OFStream <OFReadyForReadingObserving> *)
				      stream
			  buffer: (void *)buffer
		     exactLength: (size_t)exactLength
			    mode: (OFRunLoopMode)mode
# ifdef OF_HAVE_BLOCKS
			   block: (OFStreamAsyncReadBlock)block
# endif
			delegate: (id <OFStreamDelegate>)delegate
{
	NEW_READ(OFRunLoopExactReadQueueItem, stream, mode)

	queueItem->_delegate = [delegate retain];
# ifdef OF_HAVE_BLOCKS
	queueItem->_block = [block copy];
# endif
	queueItem->_buffer = buffer;
	queueItem->_exactLength = exactLength;

	QUEUE_ITEM
}

+ (void)of_addAsyncReadLineForStream: (OFStream <OFReadyForReadingObserving> *)
					  stream
			    encoding: (OFStringEncoding)encoding
				mode: (OFRunLoopMode)mode
# ifdef OF_HAVE_BLOCKS
			       block: (OFStreamAsyncReadLineBlock)block
# endif
			    delegate: (id <OFStreamDelegate>)delegate
{
	NEW_READ(OFRunLoopReadLineQueueItem, stream, mode)

	queueItem->_delegate = [delegate retain];
# ifdef OF_HAVE_BLOCKS
	queueItem->_block = [block copy];
# endif
	queueItem->_encoding = encoding;

	QUEUE_ITEM
}

+ (void)of_addAsyncWriteForStream: (OFStream <OFReadyForWritingObserving> *)
				       stream
			     data: (OFData *)data
			     mode: (OFRunLoopMode)mode
# ifdef OF_HAVE_BLOCKS
			    block: (OFStreamAsyncWriteDataBlock)block
# endif
			 delegate: (id <OFStreamDelegate>)delegate
{
	NEW_WRITE(OFRunLoopWriteDataQueueItem, stream, mode)

	queueItem->_delegate = [delegate retain];
# ifdef OF_HAVE_BLOCKS
	queueItem->_block = [block copy];
# endif
	queueItem->_data = [data copy];

	QUEUE_ITEM
}

+ (void)of_addAsyncWriteForStream: (OFStream <OFReadyForWritingObserving> *)
				       stream
			   string: (OFString *)string
			 encoding: (OFStringEncoding)encoding
			     mode: (OFRunLoopMode)mode
# ifdef OF_HAVE_BLOCKS
			    block: (OFStreamAsyncWriteStringBlock)block
# endif
			 delegate: (id <OFStreamDelegate>)delegate
{
	NEW_WRITE(OFRunLoopWriteStringQueueItem, stream, mode)

	queueItem->_delegate = [delegate retain];
# ifdef OF_HAVE_BLOCKS
	queueItem->_block = [block copy];
# endif
	queueItem->_string = [string copy];
	queueItem->_encoding = encoding;

	QUEUE_ITEM
}

# if !defined(OF_WII) && !defined(OF_NINTENDO_3DS)
+ (void)of_addAsyncConnectForSocket: (id)sock
			       mode: (OFRunLoopMode)mode
			   delegate: (id <OFRunLoopConnectDelegate>)delegate
{
	NEW_WRITE(OFRunLoopConnectQueueItem, sock, mode)

	queueItem->_delegate = [delegate retain];

	QUEUE_ITEM
}
# endif

+ (void)of_addAsyncAcceptForSocket: (id)sock
			      mode: (OFRunLoopMode)mode
			     block: (id)block
			  delegate: (id)delegate
{
	NEW_READ(OFRunLoopAcceptQueueItem, sock, mode)

	queueItem->_delegate = [delegate retain];
# ifdef OF_HAVE_BLOCKS
	queueItem->_block = [block copy];
# endif

	QUEUE_ITEM
}

+ (void)of_addAsyncReceiveForDatagramSocket: (OFDatagramSocket *)sock
    buffer: (void *)buffer
    length: (size_t)length
      mode: (OFRunLoopMode)mode
# ifdef OF_HAVE_BLOCKS
     block: (OFDatagramSocketAsyncReceiveBlock)block
# endif
  delegate: (id <OFDatagramSocketDelegate>)delegate
{
	NEW_READ(OFRunLoopDatagramReceiveQueueItem, sock, mode)

	queueItem->_delegate = [delegate retain];
# ifdef OF_HAVE_BLOCKS
	queueItem->_block = [block copy];
# endif
	queueItem->_buffer = buffer;
	queueItem->_length = length;

	QUEUE_ITEM
}

+ (void)of_addAsyncSendForDatagramSocket: (OFDatagramSocket *)sock
      data: (OFData *)data
  receiver: (const OFSocketAddress *)receiver
      mode: (OFRunLoopMode)mode
# ifdef OF_HAVE_BLOCKS
     block: (OFDatagramSocketAsyncSendDataBlock)block
# endif
  delegate: (id <OFDatagramSocketDelegate>)delegate
{
	NEW_WRITE(OFRunLoopDatagramSendQueueItem, sock, mode)

	queueItem->_delegate = [delegate retain];
# ifdef OF_HAVE_BLOCKS
	queueItem->_block = [block copy];
# endif
	queueItem->_data = [data copy];
	queueItem->_receiver = *receiver;

	QUEUE_ITEM
}

+ (void)of_addAsyncReceiveForSequencedPacketSocket: (OFSequencedPacketSocket *)
							sock
    buffer: (void *)buffer
    length: (size_t)length
      mode: (OFRunLoopMode)mode
# ifdef OF_HAVE_BLOCKS
     block: (OFSequencedPacketSocketAsyncReceiveBlock)block
# endif
  delegate: (id <OFSequencedPacketSocketDelegate>)delegate
{
	NEW_READ(OFRunLoopPacketReceiveQueueItem, sock, mode)

	queueItem->_delegate = [delegate retain];
# ifdef OF_HAVE_BLOCKS
	queueItem->_block = [block copy];
# endif
	queueItem->_buffer = buffer;
	queueItem->_length = length;

	QUEUE_ITEM
}

+ (void)of_addAsyncSendForSequencedPacketSocket: (OFSequencedPacketSocket *)sock
      data: (OFData *)data
      mode: (OFRunLoopMode)mode
# ifdef OF_HAVE_BLOCKS
     block: (OFSequencedPacketSocketAsyncSendDataBlock)block
# endif
  delegate: (id <OFSequencedPacketSocketDelegate>)delegate
{
	NEW_WRITE(OFRunLoopPacketSendQueueItem, sock, mode)

	queueItem->_delegate = [delegate retain];
# ifdef OF_HAVE_BLOCKS
	queueItem->_block = [block copy];
# endif
	queueItem->_data = [data copy];

	QUEUE_ITEM
}
# undef NEW_READ
# undef NEW_WRITE
# undef QUEUE_ITEM

+ (void)of_cancelAsyncRequestsForObject: (id)object mode: (OFRunLoopMode)mode
{
	void *pool = objc_autoreleasePoolPush();
	OFRunLoop *runLoop = [self currentRunLoop];
	OFRunLoopState *state = stateForMode(runLoop, mode, false);
	OFList *queue;

	if (state == nil)
		return;

	if ((queue = [state->_writeQueues objectForKey: object]) != nil) {
		assert(queue.count > 0);

		/*
		 * Clear the queue now, in case this has been called from a
		 * handler, as otherwise, we'd do the cleanups below twice.
		 */
		[queue removeAllObjects];

		[state->_kernelEventObserver removeObjectForWriting: object];
		[state->_writeQueues removeObjectForKey: object];
	}

	if ((queue = [state->_readQueues objectForKey: object]) != nil) {
		assert(queue.count > 0);

		/*
		 * Clear the queue now, in case this has been called from a
		 * handler, as otherwise, we'd do the cleanups below twice.
		 */
		[queue removeAllObjects];

		[state->_kernelEventObserver removeObjectForReading: object];
		[state->_readQueues removeObjectForKey: object];
	}

	objc_autoreleasePoolPop(pool);
}
#endif

- (instancetype)init
{
	self = [super init];

	@try {
		OFRunLoopState *state;

		_states = [[OFMutableDictionary alloc] init];

		state = [[OFRunLoopState alloc] init];
		@try {
			[_states setObject: state forKey: OFDefaultRunLoopMode];
		} @finally {
			[state release];
		}

#ifdef OF_HAVE_THREADS
		_statesMutex = [[OFMutex alloc] init];
#endif
	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}

- (void)dealloc
{
	[_states release];
#ifdef OF_HAVE_THREADS
	[_statesMutex release];
#endif

	[super dealloc];
}

- (void)addTimer: (OFTimer *)timer
{
	[self addTimer: timer forMode: OFDefaultRunLoopMode];
}

- (void)addTimer: (OFTimer *)timer forMode: (OFRunLoopMode)mode
{
	OFRunLoopState *state = stateForMode(self, mode, true);

#ifdef OF_HAVE_THREADS
	[state->_timersQueueMutex lock];
	@try {
#endif
		[state->_timersQueue insertObject: timer];
#ifdef OF_HAVE_THREADS
	} @finally {
		[state->_timersQueueMutex unlock];
	}
#endif

	[timer of_setInRunLoop: self mode: mode];

#if defined(OF_HAVE_SOCKETS)
	[state->_kernelEventObserver cancel];
#elif defined(OF_HAVE_THREADS)
	[state->_condition signal];
#endif
}

- (void)of_removeTimer: (OFTimer *)timer forMode: (OFRunLoopMode)mode
{
	OFRunLoopState *state = stateForMode(self, mode, false);

	/* {} required to avoid -Wmisleading-indentation false positive. */
	if (state == nil) {
		return;
	}

#ifdef OF_HAVE_THREADS
	[state->_timersQueueMutex lock];
	@try {
#endif
		for (OFListItem iter = state->_timersQueue.firstListItem;
		    iter != NULL; iter = OFListItemNext(iter)) {
			if ([OFListItemObject(iter) isEqual: timer]) {
				[state->_timersQueue removeListItem: iter];
				break;
			}
		}
#ifdef OF_HAVE_THREADS
	} @finally {
		[state->_timersQueueMutex unlock];
	}
#endif
}

#ifdef OF_AMIGAOS
- (void)addExecSignal: (ULONG)signal target: (id)target selector: (SEL)selector
{
	[self addExecSignal: signal
		    forMode: OFDefaultRunLoopMode
		     target: target
		   selector: selector];
}

- (void)addExecSignal: (ULONG)signal
	      forMode: (OFRunLoopMode)mode
	       target: (id)target
	     selector: (SEL)selector
{
	OFRunLoopState *state = stateForMode(self, mode, true);

# ifdef OF_HAVE_THREADS
	[state->_execSignalsMutex lock];
	@try {
# endif
		[state->_execSignals addItem: &signal];
		[state->_execSignalsTargets addObject: target];
		[state->_execSignalsSelectors addItem: &selector];

# ifdef OF_HAVE_SOCKETS
		state->_kernelEventObserver.execSignalMask |= (1ul << signal);
# elif defined(OF_HAVE_THREADS)
		state->_execSignalMask |= (1ul << signal);
# endif
# ifdef OF_HAVE_THREADS
	} @finally {
		[state->_execSignalsMutex unlock];
	}
# endif

# if defined(OF_HAVE_SOCKETS)
	[state->_kernelEventObserver cancel];
# elif defined(OF_HAVE_THREADS)
	[state->_condition signal];
# endif
}

- (void)removeExecSignal: (ULONG)signal
		  target: (id)target
		selector: (SEL)selector
{
	[self removeExecSignal: signal
		       forMode: OFDefaultRunLoopMode
			target: target
		      selector: selector];
}

- (void)removeExecSignal: (ULONG)signal
		 forMode: (OFRunLoopMode)mode
		  target: (id)target
		selector: (SEL)selector
{
	OFRunLoopState *state = stateForMode(self, mode, false);

	if (state == nil)
		return;

# ifdef OF_HAVE_THREADS
	[state->_execSignalsMutex lock];
	@try {
# endif
		const ULONG *signals = state->_execSignals.items;
		const id *targets = state->_execSignalsTargets.objects;
		const SEL *selectors = state->_execSignalsSelectors.items;
		size_t count = state->_execSignals.count;
		bool found = false;
		ULONG newMask = 0;

		for (size_t i = 0; i < count; i++) {
			if (!found && signals[i] == signal &&
			    targets[i] == target && selectors[i] == selector) {
				[state->_execSignals removeItemAtIndex: i];
				[state->_execSignalsTargets
				    removeObjectAtIndex: i];
				[state->_execSignalsSelectors
				    removeItemAtIndex: i];

				found = true;
			} else
				newMask |= (1ul << signals[i]);
		}

# ifdef OF_HAVE_SOCKETS
		state->_kernelEventObserver.execSignalMask = newMask;
# elif defined(OF_HAVE_THREADS)
		state->_execSignalMask = newMask;
# endif
# ifdef OF_HAVE_THREADS
	} @finally {
		[state->_execSignalsMutex unlock];
	}
# endif

# if defined(OF_HAVE_SOCKETS)
	[state->_kernelEventObserver cancel];
# elif defined(OF_HAVE_THREADS)
	[state->_condition signal];
# endif
}
#endif

- (void)run
{
	[self runUntilDate: nil];
}

- (void)runUntilDate: (OFDate *)deadline
{
	_stop = false;

	while (!_stop &&
	    (deadline == nil || deadline.timeIntervalSinceNow >= 0))
		[self runMode: OFDefaultRunLoopMode beforeDate: deadline];
}

- (void)runMode: (OFRunLoopMode)mode beforeDate: (OFDate *)deadline
{
	void *pool = objc_autoreleasePoolPush();
	OFRunLoopMode previousMode = _currentMode;
	OFRunLoopState *state = stateForMode(self, mode, false);

	if (state == nil)
		return;

	_currentMode = mode;
	@try {
		OFDate *nextTimer;
#if defined(OF_AMIGAOS) && !defined(OF_HAVE_SOCKETS) && defined(OF_HAVE_THREADS)
		ULONG signalMask;
#endif

		for (;;) {
			OFTimer *timer;

#ifdef OF_HAVE_THREADS
			[state->_timersQueueMutex lock];
			@try {
#endif
				OFListItem listItem =
				    state->_timersQueue.firstListItem;

				if (listItem != NULL &&
				    [OFListItemObject(listItem) fireDate]
				    .timeIntervalSinceNow <= 0) {
					timer = [[OFListItemObject(listItem)
					    retain] autorelease];

					[state->_timersQueue
					    removeListItem: listItem];

					[timer of_setInRunLoop: nil mode: nil];
				} else
					break;
#ifdef OF_HAVE_THREADS
			} @finally {
				[state->_timersQueueMutex unlock];
			}
#endif

			if (timer.valid) {
				[timer fire];
				return;
			}
		}

#ifdef OF_HAVE_THREADS
		[state->_timersQueueMutex lock];
		@try {
#endif
			nextTimer = [[state->_timersQueue
			    firstObject] fireDate];
#ifdef OF_HAVE_THREADS
		} @finally {
			[state->_timersQueueMutex unlock];
		}
#endif

		/* Watch for I/O events until the next timer is due */
		if (nextTimer != nil || deadline != nil) {
			OFTimeInterval timeout;

			if (nextTimer != nil && deadline == nil)
				timeout = nextTimer.timeIntervalSinceNow;
			else if (nextTimer == nil && deadline != nil)
				timeout = deadline.timeIntervalSinceNow;
			else
				timeout = [nextTimer earlierDate: deadline]
				    .timeIntervalSinceNow;

			if (timeout < 0)
				timeout = 0;

#if defined(OF_HAVE_SOCKETS)
			@try {
				[state->_kernelEventObserver
				    observeForTimeInterval: timeout];
			} @catch (OFObserveFailedException *e) {
				if (e.errNo != EINTR)
					@throw e;
			}
#elif defined(OF_HAVE_THREADS)
			[state->_condition lock];
# ifdef OF_AMIGAOS
			signalMask = state->_execSignalMask;
			[state->_condition waitForTimeInterval: timeout
						  orExecSignal: &signalMask];
			if (signalMask != 0)
				[state execSignalWasReceived: signalMask];
# else
			[state->_condition waitForTimeInterval: timeout];
# endif
			[state->_condition unlock];
#else
			[OFThread sleepForTimeInterval: timeout];
#endif
		} else {
			/*
			 * No more timers and no deadline: Just watch for I/O
			 * until we get an event. If a timer is added by
			 * another thread, it cancels the observe.
			 */
#if defined(OF_HAVE_SOCKETS)
			@try {
				[state->_kernelEventObserver observe];
			} @catch (OFObserveFailedException *e) {
				if (e.errNo != EINTR)
					@throw e;
			}
#elif defined(OF_HAVE_THREADS)
			[state->_condition lock];
# ifdef OF_AMIGAOS
			signalMask = state->_execSignalMask;
			[state->_condition
			    waitForConditionOrExecSignal: &signalMask];
			if (signalMask != 0)
				[state execSignalWasReceived: signalMask];
# else
			[state->_condition wait];
# endif
			[state->_condition unlock];
#else
			[OFThread sleepForTimeInterval: 86400];
#endif
		}

		objc_autoreleasePoolPop(pool);
	} @finally {
		_currentMode = previousMode;
	}
}

- (void)stop
{
	OFRunLoopState *state = stateForMode(self, OFDefaultRunLoopMode, false);

	_stop = true;

	if (state == nil)
		return;

#if defined(OF_HAVE_SOCKETS)
	[state->_kernelEventObserver cancel];
#elif defined(OF_HAVE_THREADS)
	[state->_condition signal];
#endif
}
@end