ObjFW  Artifact [c1f2526e6a]

Artifact c1f2526e6a518138a780f3382f8ba98c7ab3e47cf7488393833497628317b51e:

  • File src/OFThreadPool.m — part of check-in [13ee56edf3] at 2014-06-21 21:43:43 on branch trunk — Move all macros from OFObject.h to macros.h

    This means that OFObject.h imports macros.h now, making it unnecessary
    to manually import macros.h in almost every file. And while at it, also
    import autorelease.h in OFObject.h, so that this doesn't need to be
    manually imported in almost every file as well. (user: js, size: 6946) [annotate] [blame] [check-ins using]


/*
 * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014
 *   Jonathan Schleifer <js@webkeks.org>
 *
 * All rights reserved.
 *
 * This file is part of ObjFW. It may be distributed under the terms of the
 * Q Public License 1.0, which can be found in the file LICENSE.QPL included in
 * the packaging of this file.
 *
 * Alternatively, it may be distributed under the terms of the GNU General
 * Public License, either version 2 or 3, which can be found in the file
 * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this
 * file.
 */

#include "config.h"

#import "OFThreadPool.h"
#import "OFArray.h"
#import "OFList.h"
#import "OFThread.h"
#import "OFCondition.h"
#import "OFSystemInfo.h"

@interface OFThreadPoolJob: OFObject
{
	id _target;
	SEL _selector;
	id _object;
#ifdef OF_HAVE_BLOCKS
	of_thread_pool_block_t _block;
#endif
}

+ (instancetype)jobWithTarget: (id)target
		     selector: (SEL)selector
		       object: (id)object;
#ifdef OF_HAVE_BLOCKS
+ (instancetype)jobWithBlock: (of_thread_pool_block_t)block;
#endif
- initWithTarget: (id)target
	selector: (SEL)selector
	  object: (id)object;
#ifdef OF_HAVE_BLOCKS
- initWithBlock: (of_thread_pool_block_t)block;
#endif
- (void)perform;
@end

@implementation OFThreadPoolJob
+ (instancetype)jobWithTarget: (id)target
		     selector: (SEL)selector
		       object: (id)object
{
	return [[[self alloc] initWithTarget: target
				    selector: selector
				      object: object] autorelease];
}

#ifdef OF_HAVE_BLOCKS
+ (instancetype)jobWithBlock: (of_thread_pool_block_t)block
{
	return [[[self alloc] initWithBlock: block] autorelease];
}
#endif

- initWithTarget: (id)target
	selector: (SEL)selector
	  object: (id)object
{
	self = [super init];

	@try {
		_target = [target retain];
		_selector = selector;
		_object = [object retain];
	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}

#ifdef OF_HAVE_BLOCKS
- initWithBlock: (of_thread_pool_block_t)block
{
	self = [super init];

	@try {
		_block = [block copy];
	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}
#endif

- (void)dealloc
{
	[_target release];
	[_object release];
#ifdef OF_HAVE_BLOCKS
	[_block release];
#endif

	[super dealloc];
}

- (void)perform
{
#ifdef OF_HAVE_BLOCKS
	if (_block != NULL)
		_block();
	else
#endif
		[_target performSelector: _selector
			      withObject: _object];
}
@end

@interface OFThreadPoolThread: OFThread
{
	OFList *_queue;
	OFCondition *_queueCondition, *_countCondition;
@public
	volatile bool _terminate;
	volatile int *_doneCount;
}

+ (instancetype)threadWithThreadPool: (OFThreadPool*)threadPool;
- initWithThreadPool: (OFThreadPool*)threadPool;
@end

@implementation OFThreadPoolThread
+ (instancetype)threadWithThreadPool: (OFThreadPool*)threadPool
{
	return [[[self alloc] initWithThreadPool: threadPool] autorelease];
}

- initWithThreadPool: (OFThreadPool*)threadPool
{
	self = [super init];

	@try {
		_queue = [threadPool->_queue retain];
		_queueCondition = [threadPool->_queueCondition retain];
		_countCondition = [threadPool->_countCondition retain];
		_doneCount = &threadPool->_doneCount;
	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}

- (void)dealloc
{
	[_queue release];
	[_queueCondition release];
	[_countCondition release];

	[super dealloc];
}

- (id)main
{
	void *pool;

	if (_terminate)
		return nil;

	pool = objc_autoreleasePoolPush();

	for (;;) {
		OFThreadPoolJob *job;

		[_queueCondition lock];
		@try {
			of_list_object_t *listObject;

			if (_terminate) {
				objc_autoreleasePoolPop(pool);
				return nil;
			}

			listObject = [_queue firstListObject];

			while (listObject == NULL) {
				[_queueCondition wait];

				if (_terminate) {
					objc_autoreleasePoolPop(pool);
					return nil;
				}

				listObject = [_queue firstListObject];
			}

			job = [[listObject->object retain] autorelease];
			[_queue removeListObject: listObject];
		} @finally {
			[_queueCondition unlock];
		}

		if (_terminate) {
			objc_autoreleasePoolPop(pool);
			return nil;
		}

		[job perform];

		if (_terminate) {
			objc_autoreleasePoolPop(pool);
			return nil;
		}

		objc_autoreleasePoolPop(pool);
		pool = objc_autoreleasePoolPush();

		[_countCondition lock];
		@try {
			if (_terminate) {
				objc_autoreleasePoolPop(pool);
				return nil;
			}

			(*_doneCount)++;

			[_countCondition signal];
		} @finally {
			[_countCondition unlock];
		}
	}
}
@end

@implementation OFThreadPool
+ (instancetype)threadPool
{
	return [[[self alloc] init] autorelease];
}

+ (instancetype)threadPoolWithSize: (size_t)size
{
	return [[[self alloc] initWithSize: size] autorelease];
}

- init
{
	return [self initWithSize: [OFSystemInfo numberOfCPUs]];
}

- initWithSize: (size_t)size
{
	self = [super init];

	@try {
		size_t i;

		_size = size;
		_threads = [[OFMutableArray alloc] init];
		_queue = [[OFList alloc] init];
		_queueCondition = [[OFCondition alloc] init];
		_countCondition = [[OFCondition alloc] init];

		for (i = 0; i < size; i++) {
			void *pool = objc_autoreleasePoolPush();

			OFThreadPoolThread *thread =
			    [OFThreadPoolThread threadWithThreadPool: self];

			[_threads addObject: thread];

			objc_autoreleasePoolPop(pool);
		}

		/*
		 * We need to start the threads in a separate loop to make sure
		 * threads is not modified anymore to prevent a race condition.
		 */
		for (i = 0; i < size; i++)
			[[_threads objectAtIndex: i] start];
	} @catch (id e) {
		[self release];
		@throw e;
	}

	return self;
}

- (void)dealloc
{
	void *pool = objc_autoreleasePoolPush();
	[_queueCondition lock];
	@try {
		[_countCondition lock];
		@try {
			OFEnumerator *enumerator = [_threads objectEnumerator];
			OFThreadPoolThread *thread;

			while ((thread = [enumerator nextObject]) != nil)
				thread->_terminate = true;
		} @finally {
			[_countCondition unlock];
		}

		[_queueCondition broadcast];
	} @finally {
		[_queueCondition unlock];
	}
	objc_autoreleasePoolPop(pool);

	[_threads release];
	[_queue release];
	[_queueCondition release];
	[_countCondition release];

	[super dealloc];
}

- (void)OF_dispatchJob: (OFThreadPoolJob*)job
{
	[_countCondition lock];
	_count++;
	[_countCondition unlock];

	[_queueCondition lock];
	@try {
		[_queue appendObject: job];
		[_queueCondition signal];
	} @finally {
		[_queueCondition unlock];
	}
}

- (void)waitUntilDone
{
	for (;;) {
		[_countCondition lock];
		@try {
			if (_doneCount == _count)
				return;

			[_countCondition wait];
		} @finally {
			[_countCondition unlock];
		}
	}
}

- (void)dispatchWithTarget: (id)target
		  selector: (SEL)selector
		    object: (id)object
{
	[self OF_dispatchJob: [OFThreadPoolJob jobWithTarget: target
						    selector: selector
						      object: object]];
}

#ifdef OF_HAVE_BLOCKS
- (void)dispatchWithBlock: (of_thread_pool_block_t)block
{
	[self OF_dispatchJob: [OFThreadPoolJob jobWithBlock: block]];
}
#endif

- (size_t)size
{
	return _size;
}
@end